activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r887420 [2/3] - in /activemq/activemq-dotnet/Apache.NMS.Stomp/trunk: ./ src/main/csharp/ src/main/csharp/Commands/ src/main/csharp/Threads/
Date Fri, 04 Dec 2009 22:53:42 GMT
Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs?rev=887420&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs Fri Dec  4 22:53:41 2009
@@ -0,0 +1,1087 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Threading;
+using System.Collections;
+using System.Collections.Generic;
+using Apache.NMS.Stomp.Commands;
+using Apache.NMS.Stomp.Util;
+using Apache.NMS;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.Stomp
+{
+    public enum AckType
+    {
+        DeliveredAck = 0, // Message delivered but not consumed
+        PoisonAck = 1, // Message could not be processed due to poison pill but discard anyway
+        ConsumedAck = 2, // Message consumed, discard
+        RedeliveredAck = 3, // Message has been Redelivered and is not yet poisoned.
+        IndividualAck = 4 // Only the given message is to be treated as consumed.
+    }
+
+    /// <summary>
+    /// An object capable of receiving messages from some destination
+    /// </summary>
+    public class MessageConsumer : IMessageConsumer, IDispatcher
+    {
+        private readonly MessageDispatchChannel unconsumedMessages = new MessageDispatchChannel();
+        private readonly LinkedList<MessageDispatch> dispatchedMessages = new LinkedList<MessageDispatch>();
+        private readonly ConsumerInfo info;
+        private Session session;
+
+        private MessageAck pendingAck = null;
+
+        private Atomic<bool> started = new Atomic<bool>();
+        private Atomic<bool> deliveringAcks = new Atomic<bool>();
+
+        private int maximumRedeliveryCount = 10;
+        private int redeliveryTimeout = 500;
+        protected bool disposed = false;
+        private long lastDeliveredSequenceId = 0;
+        private int deliveredCounter = 0;
+        private int additionalWindowSize = 0;
+        private long redeliveryDelay = 0;
+        private int dispatchedCount = 0;
+        private volatile bool synchronizationRegistered = false;
+        private bool clearDispatchList = false;
+
+        private const int DEFAULT_REDELIVERY_DELAY = 0;
+        private const int DEFAULT_MAX_REDELIVERIES = 5;
+
+        private event MessageListener listener;
+
+        private IRedeliveryPolicy redeliveryPolicy;
+
+        // Constructor internal to prevent clients from creating an instance.
+        internal MessageConsumer(Session session, ConsumerInfo info)
+        {
+            this.session = session;
+            this.info = info;
+            this.redeliveryPolicy = this.session.Connection.RedeliveryPolicy;
+        }
+
+        ~MessageConsumer()
+        {
+            Dispose(false);
+        }
+
+        #region Property Accessors
+
+        public long LastDeliveredSequenceId
+        {
+            get { return this.lastDeliveredSequenceId; }
+        }
+
+        public ConsumerId ConsumerId
+        {
+            get { return info.ConsumerId; }
+        }
+
+        public int MaximumRedeliveryCount
+        {
+            get { return maximumRedeliveryCount; }
+            set { maximumRedeliveryCount = value; }
+        }
+
+        public int RedeliveryTimeout
+        {
+            get { return redeliveryTimeout; }
+            set { redeliveryTimeout = value; }
+        }
+
+        public int PrefetchSize
+        {
+            get { return this.info.PrefetchSize; }
+        }
+
+        public IRedeliveryPolicy RedeliveryPolicy
+        {
+            get { return this.redeliveryPolicy; }
+            set { this.redeliveryPolicy = value; }
+        }
+
+        #endregion
+
+        #region IMessageConsumer Members
+
+        public event MessageListener Listener
+        {
+            add
+            {
+                CheckClosed();
+
+                if(this.PrefetchSize == 0)
+                {
+                    throw new NMSException("Cannot set Asynchronous Listener on a Consumer with a zero Prefetch size");
+                }
+
+                bool wasStarted = this.session.Started;
+
+                if(wasStarted == true)
+                {
+                    this.session.Stop();
+                }
+
+                listener += value;
+                this.session.Redispatch(this.unconsumedMessages);
+
+                if(wasStarted == true)
+                {
+                    this.session.Start();
+                }
+            }
+            remove { listener -= value; }
+        }
+
+        public IMessage Receive()
+        {
+            CheckClosed();
+            CheckMessageListener();
+
+            SendPullRequest(0);
+            MessageDispatch dispatch = this.Dequeue(TimeSpan.FromMilliseconds(-1));
+
+            if(dispatch == null)
+            {
+                return null;
+            }
+
+            BeforeMessageIsConsumed(dispatch);
+            AfterMessageIsConsumed(dispatch, false);
+
+            return CreateActiveMQMessage(dispatch);
+        }
+
+        public IMessage Receive(TimeSpan timeout)
+        {
+            CheckClosed();
+            CheckMessageListener();
+
+            MessageDispatch dispatch = null;
+            SendPullRequest((long) timeout.TotalMilliseconds);
+
+            if(this.PrefetchSize == 0)
+            {
+                dispatch = this.Dequeue(TimeSpan.FromMilliseconds(-1));
+            }
+            else
+            {
+                dispatch = this.Dequeue(timeout);
+            }
+
+            if(dispatch == null)
+            {
+                return null;
+            }
+
+            BeforeMessageIsConsumed(dispatch);
+            AfterMessageIsConsumed(dispatch, false);
+
+            return CreateActiveMQMessage(dispatch);
+        }
+
+        public IMessage ReceiveNoWait()
+        {
+            CheckClosed();
+            CheckMessageListener();
+
+            MessageDispatch dispatch = null;
+            SendPullRequest(-1);
+
+            if(this.PrefetchSize == 0)
+            {
+                dispatch = this.Dequeue(TimeSpan.FromMilliseconds(-1));
+            }
+            else
+            {
+                dispatch = this.Dequeue(TimeSpan.Zero);
+            }
+
+            if(dispatch == null)
+            {
+                return null;
+            }
+
+            BeforeMessageIsConsumed(dispatch);
+            AfterMessageIsConsumed(dispatch, false);
+
+            return CreateActiveMQMessage(dispatch);
+        }
+
+        public void Dispose()
+        {
+            Dispose(true);
+            GC.SuppressFinalize(this);
+        }
+
+        protected void Dispose(bool disposing)
+        {
+            if(disposed)
+            {
+                return;
+            }
+
+            if(disposing)
+            {
+                // Dispose managed code here.
+            }
+
+            try
+            {
+                Close();
+            }
+            catch
+            {
+                // Ignore network errors.
+            }
+
+            disposed = true;
+        }
+
+        public void Close()
+        {
+            if(!this.unconsumedMessages.Closed)
+            {
+                if(this.session.IsTransacted && this.session.TransactionContext.InTransaction)
+                {
+                    this.session.TransactionContext.AddSynchronization(new ConsumerCloseSynchronization(this));
+                }
+                else
+                {
+                    this.DoClose();
+                }
+            }
+        }
+
+        internal void DoClose()
+        {
+            if(!this.unconsumedMessages.Closed)
+            {
+                Tracer.Debug("Closing down the Consumer");
+
+                // Do we have any acks we need to send out before closing?
+                // Ack any delivered messages now.
+                if(!this.session.IsTransacted)
+                {
+                    DeliverAcks();
+                    if(this.IsAutoAcknowledgeBatch)
+                    {
+                        Acknowledge();
+                    }
+                }
+
+                if(!this.session.IsTransacted)
+                {
+                    lock(this.dispatchedMessages)
+                    {
+                        dispatchedMessages.Clear();
+                    }
+                }
+
+                this.unconsumedMessages.Close();
+                this.session.DisposeOf(this.info.ConsumerId, this.lastDeliveredSequenceId);
+
+                RemoveInfo removeCommand = new RemoveInfo();
+                removeCommand.ObjectId = this.info.ConsumerId;
+                removeCommand.LastDeliveredSequenceId = this.lastDeliveredSequenceId;
+
+                this.session.Connection.Oneway(removeCommand);
+                this.session = null;
+
+                Tracer.Debug("Consumer instnace Closed.");
+            }
+        }
+
+        #endregion
+
+        protected void SendPullRequest(long timeout)
+        {
+            if(this.info.PrefetchSize == 0 && this.unconsumedMessages.Empty)
+            {
+                MessagePull messagePull = new MessagePull();
+                messagePull.ConsumerId = this.info.ConsumerId;
+                messagePull.Destination = this.info.Destination;
+                messagePull.Timeout = timeout;
+                messagePull.ResponseRequired = false;
+
+                if(Tracer.IsDebugEnabled)
+                {
+                    Tracer.Debug("Sending MessagePull: " + messagePull);
+                }
+
+                session.Connection.Oneway(messagePull);
+            }
+        }
+
+        protected void DoIndividualAcknowledge(Message message)
+        {
+            MessageDispatch dispatch = null;
+
+            lock(this.dispatchedMessages)
+            {
+                foreach(MessageDispatch originalDispatch in this.dispatchedMessages)
+                {
+                    if(originalDispatch.Message.MessageId.Equals(message.MessageId))
+                    {
+                        dispatch = originalDispatch;
+                        this.dispatchedMessages.Remove(originalDispatch);
+                        break;
+                    }
+                }
+            }
+
+            if(dispatch == null)
+            {
+                Tracer.DebugFormat("Attempt to Ack MessageId[{0}] failed because the original dispatch is not in the Dispatch List", message.MessageId);
+                return;
+            }
+
+            MessageAck ack = new MessageAck();
+
+            ack.AckType = (byte) AckType.IndividualAck;
+            ack.ConsumerId = this.info.ConsumerId;
+            ack.Destination = dispatch.Destination;
+            ack.LastMessageId = dispatch.Message.MessageId;
+            ack.MessageCount = 1;
+
+            Tracer.Debug("Sending Individual Ack for MessageId: " + ack.LastMessageId.ToString());
+            this.session.Connection.Oneway(ack);
+        }
+
+        protected void DoNothingAcknowledge(Message message)
+        {
+        }
+
+        protected void DoClientAcknowledge(Message message)
+        {
+            this.CheckClosed();
+            Tracer.Debug("Sending Client Ack:");
+            this.session.Acknowledge();
+        }
+
+        public void Start()
+        {
+            if(this.unconsumedMessages.Closed)
+            {
+                return;
+            }
+
+            this.started.Value = true;
+            this.unconsumedMessages.Start();
+            this.session.Executor.Wakeup();
+        }
+
+        public void Stop()
+        {
+            this.started.Value = false;
+            this.unconsumedMessages.Stop();
+        }
+
+        public void ClearMessagesInProgress()
+        {
+            // we are called from inside the transport reconnection logic
+            // which involves us clearing all the connections' consumers
+            // dispatch lists and clearing them
+            // so rather than trying to grab a mutex (which could be already
+            // owned by the message listener calling the send) we will just set
+            // a flag so that the list can be cleared as soon as the
+            // dispatch thread is ready to flush the dispatch list
+            this.clearDispatchList = true;
+        }
+
+        public void DeliverAcks()
+        {
+            MessageAck ack = null;
+
+            if(this.deliveringAcks.CompareAndSet(false, true))
+            {
+                if(this.IsAutoAcknowledgeEach)
+                {
+                    lock(this.dispatchedMessages)
+                    {
+                        ack = MakeAckForAllDeliveredMessages(AckType.DeliveredAck);
+                        if(ack != null)
+                        {
+                            this.dispatchedMessages.Clear();
+                        }
+                        else
+                        {
+                            ack = this.pendingAck;
+                            this.pendingAck = null;
+                        }
+                    }
+                }
+                else if(pendingAck != null && pendingAck.AckType == (byte) AckType.ConsumedAck)
+                {
+                    ack = pendingAck;
+                    pendingAck = null;
+                }
+
+                if(ack != null)
+                {
+                    MessageAck ackToSend = ack;
+
+                    try
+                    {
+                        this.session.Connection.Oneway(ackToSend);
+                    }
+                    catch(Exception e)
+                    {
+                        Tracer.DebugFormat("{0} : Failed to send ack, {1}", this.info.ConsumerId, e);
+                    }
+                }
+                else
+                {
+                    this.deliveringAcks.Value = false;
+                }
+            }
+        }
+
+        public void Dispatch(MessageDispatch dispatch)
+        {
+            MessageListener listener = this.listener;
+
+            try
+            {
+                lock(this.unconsumedMessages.SyncRoot)
+                {
+                    if(this.clearDispatchList)
+                    {
+                        // we are reconnecting so lets flush the in progress messages
+                        this.clearDispatchList = false;
+                        this.unconsumedMessages.Clear();
+
+                        if(this.pendingAck != null && this.pendingAck.AckType == (byte) AckType.DeliveredAck)
+                        {
+                            // on resumption a pending delivered ack will be out of sync with
+                            // re-deliveries.
+                            if(Tracer.IsDebugEnabled)
+                            {
+                                Tracer.Debug("removing pending delivered ack on transport interupt: " + pendingAck);
+                            }
+                            this.pendingAck = null;
+                        }
+                    }
+
+                    if(!this.unconsumedMessages.Closed)
+                    {
+                        if(listener != null && this.unconsumedMessages.Running)
+                        {
+                            ActiveMQMessage message = CreateActiveMQMessage(dispatch);
+
+                            this.BeforeMessageIsConsumed(dispatch);
+
+                            try
+                            {
+                                bool expired = message.IsExpired();
+
+                                if(!expired)
+                                {
+                                    listener(message);
+                                }
+
+                                this.AfterMessageIsConsumed(dispatch, expired);
+                            }
+                            catch(Exception e)
+                            {
+                                if(IsAutoAcknowledgeBatch || IsAutoAcknowledgeEach || this.session.IsIndividualAcknowledge)
+                                {
+                                    // Redeliver the message
+                                }
+                                else
+                                {
+                                    // Transacted or Client ack: Deliver the next message.
+                                    this.AfterMessageIsConsumed(dispatch, false);
+                                }
+
+                                Tracer.Error(this.info.ConsumerId + " Exception while processing message: " + e);
+                            }
+                        }
+                        else
+                        {
+                            this.unconsumedMessages.Enqueue(dispatch);
+                        }
+                    }
+                }
+
+                if(++dispatchedCount % 1000 == 0)
+                {
+                    dispatchedCount = 0;
+                    Thread.Sleep(1);
+                }
+            }
+            catch(Exception e)
+            {
+                this.session.Connection.OnSessionException(this.session, e);
+            }
+        }
+
+        public bool Iterate()
+        {
+            if(this.listener != null)
+            {
+                MessageDispatch dispatch = this.unconsumedMessages.DequeueNoWait();
+                if(dispatch != null)
+                {
+                    try
+                    {
+                        Message message = CreateStompMessage(dispatch);
+                        BeforeMessageIsConsumed(dispatch);
+                        listener(message);
+                        AfterMessageIsConsumed(dispatch, false);
+                    }
+                    catch(NMSException e)
+                    {
+                        this.session.Connection.OnSessionException(this.session, e);
+                    }
+
+                    return true;
+                }
+            }
+
+            return false;
+        }
+
+        /// <summary>
+        /// Used to get an enqueued message from the unconsumedMessages list. The
+        /// amount of time this method blocks is based on the timeout value.  if
+        /// timeout == Timeout.Infinite then it blocks until a message is received.
+        /// if timeout == 0 then it it tries to not block at all, it returns a
+        /// message if it is available if timeout > 0 then it blocks up to timeout
+        /// amount of time.  Expired messages will consumed by this method.
+        /// </summary>
+        /// <param name="timeout">
+        /// A <see cref="System.TimeSpan"/>
+        /// </param>
+        /// <returns>
+        /// A <see cref="MessageDispatch"/>
+        /// </returns>
+        private MessageDispatch Dequeue(TimeSpan timeout)
+        {
+            DateTime deadline = DateTime.Now;
+
+            if(timeout > TimeSpan.Zero)
+            {
+                deadline += timeout;
+            }
+
+            while(true)
+            {
+                MessageDispatch dispatch = this.unconsumedMessages.Dequeue(timeout);
+
+                // Grab a single date/time for calculations to avoid timing errors.
+                DateTime dispatchTime = DateTime.Now;
+
+                if(dispatch == null)
+                {
+                    if(timeout > TimeSpan.Zero && !this.unconsumedMessages.Closed)
+                    {
+                        if(dispatchTime > deadline)
+                        {
+                            // Out of time.
+                            timeout = TimeSpan.Zero;
+                        }
+                        else
+                        {
+                            // Adjust the timeout to the remaining time.
+                            timeout = deadline - dispatchTime;
+                        }
+                    }
+                    else
+                    {
+                        return null;
+                    }
+                }
+                else if(dispatch.Message == null)
+                {
+                    return null;
+                }
+                else if(dispatch.Message.IsExpired())
+                {
+                    Tracer.DebugFormat("{0} received expired message: {1}", info.ConsumerId, dispatch.Message.MessageId);
+
+                    BeforeMessageIsConsumed(dispatch);
+                    AfterMessageIsConsumed(dispatch, true);
+                    // Refresh the dispatch time
+                    dispatchTime = DateTime.Now;
+
+                    if(timeout > TimeSpan.Zero && !this.unconsumedMessages.Closed)
+                    {
+                        if(dispatchTime > deadline)
+                        {
+                            // Out of time.
+                            timeout = TimeSpan.Zero;
+                        }
+                        else
+                        {
+                            // Adjust the timeout to the remaining time.
+                            timeout = deadline - dispatchTime;
+                        }
+                    }
+                }
+                else
+                {
+                    return dispatch;
+                }
+            }
+        }
+
+        public void BeforeMessageIsConsumed(MessageDispatch dispatch)
+        {
+            this.lastDeliveredSequenceId = dispatch.Message.MessageId.BrokerSequenceId;
+
+            if(!IsAutoAcknowledgeBatch)
+            {
+                lock(this.dispatchedMessages)
+                {
+                    this.dispatchedMessages.AddFirst(dispatch);
+                }
+
+                if(this.session.IsTransacted)
+                {
+                    this.AckLater(dispatch, AckType.DeliveredAck);
+                }
+            }
+        }
+
+        public void AfterMessageIsConsumed(MessageDispatch dispatch, bool expired)
+        {
+            if(this.unconsumedMessages.Closed)
+            {
+                return;
+            }
+
+            if(expired == true)
+            {
+                lock(this.dispatchedMessages)
+                {
+                    this.dispatchedMessages.Remove(dispatch);
+                }
+
+                AckLater(dispatch, AckType.DeliveredAck);
+            }
+            else
+            {
+                if(this.session.IsTransacted)
+                {
+                    // Do nothing.
+                }
+                else if(this.IsAutoAcknowledgeEach)
+                {
+                    if(this.deliveringAcks.CompareAndSet(false, true))
+                    {
+                        lock(this.dispatchedMessages)
+                        {
+                            if(this.dispatchedMessages.Count != 0)
+                            {
+                                MessageAck ack = MakeAckForAllDeliveredMessages(AckType.ConsumedAck);
+                                if(ack != null)
+                                {
+                                    this.dispatchedMessages.Clear();
+                                    this.session.Connection.Oneway(ack);
+                                }
+                            }
+                        }
+                        this.deliveringAcks.Value = false;
+                    }
+                }
+                else if(this.IsAutoAcknowledgeBatch)
+                {
+                    AckLater(dispatch, AckType.ConsumedAck);
+                }
+                else if(this.session.IsClientAcknowledge || this.session.IsIndividualAcknowledge)
+                {
+                    bool messageAckedByConsumer = false;
+
+                    lock(this.dispatchedMessages)
+                    {
+                        messageAckedByConsumer = this.dispatchedMessages.Contains(dispatch);
+                    }
+
+                    if(messageAckedByConsumer)
+                    {
+                        AckLater(dispatch, AckType.DeliveredAck);
+                    }
+                }
+                else
+                {
+                    throw new NMSException("Invalid session state.");
+                }
+            }
+        }
+
+        private MessageAck MakeAckForAllDeliveredMessages(AckType type)
+        {
+            lock(this.dispatchedMessages)
+            {
+                if(this.dispatchedMessages.Count == 0)
+                {
+                    return null;
+                }
+
+                MessageDispatch dispatch = this.dispatchedMessages.First.Value;
+                MessageAck ack = new MessageAck();
+
+                ack.AckType = (byte) type;
+                ack.ConsumerId = this.info.ConsumerId;
+                ack.Destination = dispatch.Destination;
+                ack.LastMessageId = dispatch.Message.MessageId;
+                ack.MessageCount = this.dispatchedMessages.Count;
+                ack.FirstMessageId = this.dispatchedMessages.Last.Value.Message.MessageId;
+
+                return ack;
+            }
+        }
+
+        private void AckLater(MessageDispatch dispatch, AckType type)
+        {
+            // Don't acknowledge now, but we may need to let the broker know the
+            // consumer got the message to expand the pre-fetch window
+            if(this.session.IsTransacted)
+            {
+                this.session.DoStartTransaction();
+
+                if(!synchronizationRegistered)
+                {
+                    this.synchronizationRegistered = true;
+                    this.session.TransactionContext.AddSynchronization(new MessageConsumerSynchronization(this));
+                }
+            }
+
+            this.deliveredCounter++;
+
+            MessageAck oldPendingAck = pendingAck;
+
+            pendingAck = new MessageAck();
+            pendingAck.AckType = (byte) type;
+            pendingAck.ConsumerId = this.info.ConsumerId;
+            pendingAck.Destination = dispatch.Destination;
+            pendingAck.LastMessageId = dispatch.Message.MessageId;
+            pendingAck.MessageCount = deliveredCounter;
+
+            if(this.session.IsTransacted && this.session.TransactionContext.InTransaction)
+            {
+                pendingAck.TransactionId = this.session.TransactionContext.TransactionId;
+            }
+
+            if(oldPendingAck == null)
+            {
+                pendingAck.FirstMessageId = pendingAck.LastMessageId;
+            }
+            else if(oldPendingAck.AckType == pendingAck.AckType)
+            {
+                pendingAck.FirstMessageId = oldPendingAck.FirstMessageId;
+            }
+            else
+            {
+                // old pending ack being superseded by ack of another type, if is is not a delivered
+                // ack and hence important, send it now so it is not lost.
+                if(oldPendingAck.AckType != (byte) AckType.DeliveredAck)
+                {
+                    if(Tracer.IsDebugEnabled)
+                    {
+                        Tracer.Debug("Sending old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
+                    }
+
+                    this.session.Connection.Oneway(oldPendingAck);
+                }
+                else
+                {
+                    if(Tracer.IsDebugEnabled)
+                    {
+                        Tracer.Debug("dropping old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
+                    }
+                }
+            }
+
+            if((0.5 * this.info.PrefetchSize) <= (this.deliveredCounter - this.additionalWindowSize))
+            {
+                this.session.Connection.Oneway(pendingAck);
+                this.pendingAck = null;
+                this.deliveredCounter = 0;
+                this.additionalWindowSize = 0;
+            }
+        }
+
+        internal void Acknowledge()
+        {
+            lock(this.dispatchedMessages)
+            {
+                // Acknowledge all messages so far.
+                MessageAck ack = MakeAckForAllDeliveredMessages(AckType.ConsumedAck);
+
+                if(ack == null)
+                {
+                    return; // no msgs
+                }
+
+                if(this.session.IsTransacted)
+                {
+                    this.session.DoStartTransaction();
+                    ack.TransactionId = this.session.TransactionContext.TransactionId;
+                }
+
+                this.session.Connection.Oneway(ack);
+                this.pendingAck = null;
+
+                // Adjust the counters
+                this.deliveredCounter = Math.Max(0, this.deliveredCounter - this.dispatchedMessages.Count);
+                this.additionalWindowSize = Math.Max(0, this.additionalWindowSize - this.dispatchedMessages.Count);
+
+                if(!this.session.IsTransacted)
+                {
+                    this.dispatchedMessages.Clear();
+                }
+            }
+        }
+
+        private void Commit()
+        {
+            lock(this.dispatchedMessages)
+            {
+                this.dispatchedMessages.Clear();
+            }
+
+            this.redeliveryDelay = 0;
+        }
+
+        private void Rollback()
+        {
+            lock(this.unconsumedMessages.SyncRoot)
+            {
+                lock(this.dispatchedMessages)
+                {
+                    if(this.dispatchedMessages.Count == 0)
+                    {
+                        return;
+                    }
+
+                    // Only increase the redelivery delay after the first redelivery..
+                    MessageDispatch lastMd = this.dispatchedMessages.First.Value;
+                    int currentRedeliveryCount = lastMd.Message.RedeliveryCounter;
+
+                    redeliveryDelay = this.redeliveryPolicy.RedeliveryDelay(currentRedeliveryCount);
+
+                    MessageId firstMsgId = this.dispatchedMessages.Last.Value.Message.MessageId;
+
+                    foreach(MessageDispatch dispatch in this.dispatchedMessages)
+                    {
+                        // Allow the message to update its internal to reflect a Rollback.
+                        dispatch.Message.OnMessageRollback();
+                    }
+
+                    if(this.redeliveryPolicy.MaximumRedeliveries >= 0 &&
+                       lastMd.Message.RedeliveryCounter > this.redeliveryPolicy.MaximumRedeliveries)
+                    {
+                        // We need to NACK the messages so that they get sent to the DLQ.
+                        MessageAck ack = new MessageAck();
+
+                        ack.AckType = (byte) AckType.PoisonAck;
+                        ack.ConsumerId = this.info.ConsumerId;
+                        ack.Destination = lastMd.Destination;
+                        ack.LastMessageId = lastMd.Message.MessageId;
+                        ack.MessageCount = this.dispatchedMessages.Count;
+                        ack.FirstMessageId = firstMsgId;
+
+                        this.session.Connection.Oneway(ack);
+
+                        // Adjust the window size.
+                        additionalWindowSize = Math.Max(0, this.additionalWindowSize - this.dispatchedMessages.Count);
+
+                        this.redeliveryDelay = 0;
+                    }
+                    else
+                    {
+                        // We only send a RedeliveryAck after the first redelivery
+                        if(currentRedeliveryCount > 0)
+                        {
+                            MessageAck ack = new MessageAck();
+
+                            ack.AckType = (byte) AckType.RedeliveredAck;
+                            ack.ConsumerId = this.info.ConsumerId;
+                            ack.Destination = lastMd.Destination;
+                            ack.LastMessageId = lastMd.Message.MessageId;
+                            ack.MessageCount = this.dispatchedMessages.Count;
+                            ack.FirstMessageId = firstMsgId;
+
+                            this.session.Connection.Oneway(ack);
+                        }
+
+                        // stop the delivery of messages.
+                        this.unconsumedMessages.Stop();
+
+                        foreach(MessageDispatch dispatch in this.dispatchedMessages)
+                        {
+                            this.unconsumedMessages.EnqueueFirst(dispatch);
+                        }
+
+                        if(redeliveryDelay > 0 && !this.unconsumedMessages.Closed)
+                        {
+                            DateTime deadline = DateTime.Now.AddMilliseconds(redeliveryDelay);
+                            ThreadPool.QueueUserWorkItem(this.RollbackHelper, deadline);
+                        }
+                        else
+                        {
+                            Start();
+                        }
+                    }
+
+                    this.deliveredCounter -= this.dispatchedMessages.Count;
+                    this.dispatchedMessages.Clear();
+                }
+            }
+
+            // Only redispatch if there's an async listener otherwise a synchronous
+            // consumer will pull them from the local queue.
+            if(this.listener != null)
+            {
+                this.session.Redispatch(this.unconsumedMessages);
+            }
+        }
+
+        private void RollbackHelper(Object arg)
+        {
+            try
+            {
+                TimeSpan waitTime = (DateTime) arg - DateTime.Now;
+
+                if(waitTime.CompareTo(TimeSpan.Zero) > 0)
+                {
+                    Thread.Sleep(waitTime);
+                }
+
+                this.Start();
+            }
+            catch(Exception e)
+            {
+                if(!this.unconsumedMessages.Closed)
+                {
+                    this.session.Connection.OnSessionException(this.session, e);
+                }
+            }
+        }
+
+        private Message CreateStompMessage(MessageDispatch dispatch)
+        {
+            Message message = dispatch.Message.Clone() as Message;
+
+            message.Connection = this.session.Connection;
+
+            if(this.session.IsClientAcknowledge)
+            {
+                message.Acknowledger += new AcknowledgeHandler(DoClientAcknowledge);
+            }
+            else if(this.session.IsIndividualAcknowledge)
+            {
+                message.Acknowledger += new AcknowledgeHandler(DoIndividualAcknowledge);
+            }
+            else
+            {
+                message.Acknowledger += new AcknowledgeHandler(DoNothingAcknowledge);
+            }
+
+            return message;
+        }
+
+        private void CheckClosed()
+        {
+            if(this.unconsumedMessages.Closed)
+            {
+                throw new NMSException("The Consumer has been Closed");
+            }
+        }
+
+        private void CheckMessageListener()
+        {
+            if(this.listener != null)
+            {
+                throw new NMSException("Cannot set Async listeners on Consumers with a prefetch limit of zero");
+            }
+        }
+
+        private bool IsAutoAcknowledgeEach
+        {
+            get
+            {
+                return this.session.IsAutoAcknowledge ||
+                       (this.session.IsDupsOkAcknowledge && this.info.Destination.IsQueue);
+            }
+        }
+
+        private bool IsAutoAcknowledgeBatch
+        {
+            get { return this.session.IsDupsOkAcknowledge && !this.info.Destination.IsQueue; }
+        }
+
+        #region Nested ISyncronization Types
+
+        class MessageConsumerSynchronization : ISynchronization
+        {
+            private readonly MessageConsumer consumer;
+
+            public MessageConsumerSynchronization(MessageConsumer consumer)
+            {
+                this.consumer = consumer;
+            }
+
+            public void BeforeEnd()
+            {
+                this.consumer.Acknowledge();
+                this.consumer.synchronizationRegistered = false;
+            }
+
+            public void AfterCommit()
+            {
+                this.consumer.Commit();
+                this.consumer.synchronizationRegistered = false;
+            }
+
+            public void AfterRollback()
+            {
+                this.consumer.Rollback();
+                this.consumer.synchronizationRegistered = false;
+            }
+        }
+
+        class ConsumerCloseSynchronization : ISynchronization
+        {
+            private readonly MessageConsumer consumer;
+
+            public ConsumerCloseSynchronization(MessageConsumer consumer)
+            {
+                this.consumer = consumer;
+            }
+
+            public void BeforeEnd()
+            {
+            }
+
+            public void AfterCommit()
+            {
+                this.consumer.DoClose();
+            }
+
+            public void AfterRollback()
+            {
+                this.consumer.DoClose();
+            }
+        }
+
+        #endregion
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageProducer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageProducer.cs?rev=887420&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageProducer.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageProducer.cs Fri Dec  4 22:53:41 2009
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using Apache.NMS.Util;
+using Apache.NMS.Stomp.Commands;
+using Apache.NMS.Stomp.Util;
+
+namespace Apache.NMS.Stomp
+{
+    /// <summary>
+    /// An object capable of sending messages to some destination
+    /// </summary>
+    public class MessageProducer : IMessageProducer
+    {
+        private Session session;
+        private bool closed = false;
+        private object closedLock = new object();
+        private readonly ProducerInfo info;
+        private int producerSequenceId = 0;
+
+        private MsgDeliveryMode msgDeliveryMode = NMSConstants.defaultDeliveryMode;
+        private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout;
+        private TimeSpan msgTimeToLive = NMSConstants.defaultTimeToLive;
+        private MsgPriority msgPriority = NMSConstants.defaultPriority;
+        private bool disableMessageID = false;
+        private bool disableMessageTimestamp = false;
+        protected bool disposed = false;
+
+        public MessageProducer(Session session, ProducerInfo info)
+        {
+            this.session = session;
+            this.info = info;
+            this.RequestTimeout = session.RequestTimeout;
+        }
+
+        ~MessageProducer()
+        {
+            Dispose(false);
+        }
+
+        public void Dispose()
+        {
+            Dispose(true);
+            GC.SuppressFinalize(this);
+        }
+
+        protected void Dispose(bool disposing)
+        {
+            if(disposed)
+            {
+                return;
+            }
+
+            if(disposing)
+            {
+                // Dispose managed code here.
+            }
+
+            try
+            {
+                Close();
+            }
+            catch
+            {
+                // Ignore network errors.
+            }
+
+            disposed = true;
+        }
+
+        public void Close()
+        {
+            lock(closedLock)
+            {
+                if(closed)
+                {
+                    return;
+                }
+
+                DoClose();
+                RemoveInfo removeInfo = new RemoveInfo();
+                removeInfo.ObjectId = this.info.ProducerId;
+                this.session.Connection.Oneway(removeInfo);
+                this.session = null;
+            }
+        }
+
+        internal void DoClose()
+        {
+            lock(closedLock)
+            {
+                if(closed)
+                {
+                    return;
+                }
+
+                try
+                {
+                    session.DisposeOf(info.ProducerId);
+                }
+                catch(Exception ex)
+                {
+                    Tracer.ErrorFormat("Error during producer close: {0}", ex);
+                }
+
+                if(this.usage != null)
+                {
+                    this.usage.Stop();
+                }
+
+                closed = true;
+            }
+        }
+
+        public void Send(IMessage message)
+        {
+            Send(info.Destination, message, this.msgDeliveryMode, this.msgPriority, this.msgTimeToLive, false);
+        }
+
+        public void Send(IDestination destination, IMessage message)
+        {
+            Send(destination, message, this.msgDeliveryMode, this.msgPriority, this.msgTimeToLive, false);
+        }
+
+        public void Send(IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
+        {
+            Send(info.Destination, message, deliveryMode, priority, timeToLive, true);
+        }
+
+        public void Send(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
+        {
+            Send(destination, message, deliveryMode, priority, timeToLive, true);
+        }
+
+        protected void Send(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive, bool specifiedTimeToLive)
+        {
+            if(null == destination)
+            {
+                // See if this producer was created without a destination.
+                if(null == info.Destination)
+                {
+                    throw new NotSupportedException();
+                }
+
+                // The producer was created with a destination, but an invalid destination
+                // was specified.
+                throw new Apache.NMS.InvalidDestinationException();
+            }
+
+            Message stompMessage = (Message) message;
+
+            stompMessage.ProducerId = info.ProducerId;
+            stompMessage.FromDestination = destination;
+            stompMessage.NMSDeliveryMode = deliveryMode;
+            stompMessage.NMSPriority = priority;
+
+            // Always set the message Id regardless of the disable flag.
+            MessageId id = new MessageId();
+            id.ProducerId = info.ProducerId;
+            id.ProducerSequenceId = Interlocked.Increment(ref this.producerSequenceId);
+            stompMessage.MessageId = id;
+
+            if(!disableMessageTimestamp)
+            {
+                stompMessage.NMSTimestamp = DateTime.UtcNow;
+            }
+
+            if(specifiedTimeToLive)
+            {
+                stompMessage.NMSTimeToLive = timeToLive;
+            }
+
+            lock(closedLock)
+            {
+                if(closed)
+                {
+                    throw new ConnectionClosedException();
+                }
+
+                session.DoSend(stompMessage, this, this.usage, this.RequestTimeout);
+            }
+        }
+
+        public ProducerId ProducerId
+        {
+            get { return info.ProducerId; }
+        }
+
+        public MsgDeliveryMode DeliveryMode
+        {
+            get { return msgDeliveryMode; }
+            set { this.msgDeliveryMode = value; }
+        }
+
+        public TimeSpan TimeToLive
+        {
+            get { return msgTimeToLive; }
+            set { this.msgTimeToLive = value; }
+        }
+
+        public TimeSpan RequestTimeout
+        {
+            get { return requestTimeout; }
+            set { this.requestTimeout = value; }
+        }
+
+        public MsgPriority Priority
+        {
+            get { return msgPriority; }
+            set { this.msgPriority = value; }
+        }
+
+        public bool DisableMessageID
+        {
+            get { return disableMessageID; }
+            set { this.disableMessageID = value; }
+        }
+
+        public bool DisableMessageTimestamp
+        {
+            get { return disableMessageTimestamp; }
+            set { this.disableMessageTimestamp = value; }
+        }
+
+        public IMessage CreateMessage()
+        {
+            return session.CreateMessage();
+        }
+
+        public ITextMessage CreateTextMessage()
+        {
+            return session.CreateTextMessage();
+        }
+
+        public ITextMessage CreateTextMessage(string text)
+        {
+            return session.CreateTextMessage(text);
+        }
+
+        public IMapMessage CreateMapMessage()
+        {
+            return session.CreateMapMessage();
+        }
+
+        public IObjectMessage CreateObjectMessage(object body)
+        {
+            throw new NotSupportedException("No Object Message in Stomp");
+        }
+
+        public IBytesMessage CreateBytesMessage()
+        {
+            return session.CreateBytesMessage();
+        }
+
+        public IBytesMessage CreateBytesMessage(byte[] body)
+        {
+            return session.CreateBytesMessage(body);
+        }
+
+        public IStreamMessage CreateStreamMessage()
+        {
+            return session.CreateStreamMessage();
+        }
+
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageProducer.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageProducer.cs
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/PrefetchPolicy.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/PrefetchPolicy.cs?rev=887420&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/PrefetchPolicy.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/PrefetchPolicy.cs Fri Dec  4 22:53:41 2009
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using Apache.NMS;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.Stomp
+{
+    /// <summary>
+    /// Class used to define the various limits that should be used for the Prefetch
+    /// limit on destination based on the type of Destination in use.
+    /// </summary>
+    public class PrefetchPolicy : ICloneable
+    {
+        public const int MAX_PREFETCH_SIZE = Int16.MaxValue - 1;
+        public const int DEFAULT_QUEUE_PREFETCH = 1000;
+        public const int DEFAULT_QUEUE_BROWSER_PREFETCH = 500;
+        public const int DEFAULT_DURABLE_TOPIC_PREFETCH = 100;
+        public const int DEFAULT_TOPIC_PREFETCH = MAX_PREFETCH_SIZE;
+
+        private int queuePrefetch;
+        private int queueBrowserPrefetch;
+        private int topicPrefetch;
+        private int durableTopicPrefetch;
+        private int maximumPendingMessageLimit;
+
+        public PrefetchPolicy()
+        {
+            this.queuePrefetch = DEFAULT_QUEUE_PREFETCH;
+            this.queueBrowserPrefetch = DEFAULT_QUEUE_BROWSER_PREFETCH;
+            this.topicPrefetch = DEFAULT_TOPIC_PREFETCH;
+            this.durableTopicPrefetch = DEFAULT_DURABLE_TOPIC_PREFETCH;
+        }
+
+        public int QueuePrefetch
+        {
+            get { return this.queuePrefetch; }
+            set { this.queuePrefetch = RestrictToMaximum(value); }
+        }
+
+        public int QueueBrowserPrefetch
+        {
+            get { return this.queueBrowserPrefetch; }
+            set { this.queueBrowserPrefetch = RestrictToMaximum(value); }
+        }
+
+        public int TopicPrefetch
+        {
+            get { return this.topicPrefetch; }
+            set { this.topicPrefetch = RestrictToMaximum(value); }
+        }
+
+        public int DurableTopicPrefetch
+        {
+            get { return this.durableTopicPrefetch; }
+            set { this.durableTopicPrefetch = RestrictToMaximum(value); }
+        }
+
+        public int MaximumPendingMessageLimit
+        {
+            get { return this.maximumPendingMessageLimit; }
+            set { this.maximumPendingMessageLimit = value; }
+        }
+
+        public void SetAll(int value)
+        {
+            this.queuePrefetch = value;
+            this.queueBrowserPrefetch = value;
+            this.topicPrefetch = value;
+            this.durableTopicPrefetch = value;
+        }
+
+        private int RestrictToMaximum(int value)
+        {
+            return System.Math.Min(value, MAX_PREFETCH_SIZE);
+        }
+
+        public Object Clone()
+        {
+            return this.MemberwiseClone();
+        }
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/PrefetchPolicy.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs?rev=887420&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs Fri Dec  4 22:53:41 2009
@@ -0,0 +1,834 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections;
+using System.Threading;
+using Apache.NMS.Stomp.Commands;
+using Apache.NMS.Stomp.Util;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.Stomp
+{
+    /// <summary>
+    /// Default provider of ISession
+    /// </summary>
+    public class Session : ISession, IDispatcher
+    {
+        /// <summary>
+        /// Private object used for synchronization, instead of public "this"
+        /// </summary>
+        private readonly object myLock = new object();
+
+        private readonly IDictionary consumers = Hashtable.Synchronized(new Hashtable());
+        private readonly IDictionary producers = Hashtable.Synchronized(new Hashtable());
+
+        private SessionExecutor executor;
+        private TransactionContext transactionContext;
+        private Connection connection;
+
+        private bool dispatchAsync;
+        private bool exclusive;
+        private bool retroactive;
+        private byte priority;
+
+        private readonly SessionInfo info;
+        private int consumerCounter;
+        private int producerCounter;
+        private long nextDeliveryId;
+        private long lastDeliveredSequenceId;
+        private bool disposed = false;
+        private bool closed = false;
+        private bool closing = false;
+        private TimeSpan requestTimeout = Apache.NMS.NMSConstants.defaultRequestTimeout;
+        private AcknowledgementMode acknowledgementMode;
+
+        public Session(Connection connection, SessionInfo info, AcknowledgementMode acknowledgementMode)
+        {
+            this.connection = connection;
+            this.info = info;
+            this.acknowledgementMode = acknowledgementMode;
+            this.requestTimeout = connection.RequestTimeout;
+
+            if(acknowledgementMode == AcknowledgementMode.Transactional)
+            {
+                this.transactionContext = new TransactionContext(this);
+            }
+
+            this.executor = new SessionExecutor(this, this.consumers);
+        }
+
+        ~Session()
+        {
+            Dispose(false);
+        }
+
+        #region Property Accessors
+
+        /// <summary>
+        /// Sets the prefetch size, the maximum number of messages a broker will dispatch to consumers
+        /// until acknowledgements are received.
+        /// </summary>
+        public int PrefetchSize
+        {
+            set{ this.connection.PrefetchPolicy.SetAll(value); }
+        }
+
+        /// <summary>
+        /// Sets the maximum number of messages to keep around per consumer
+        /// in addition to the prefetch window for non-durable topics until messages
+        /// will start to be evicted for slow consumers.
+        /// Must be > 0 to enable this feature
+        /// </summary>
+        public int MaximumPendingMessageLimit
+        {
+            set{ this.connection.PrefetchPolicy.MaximumPendingMessageLimit = value; }
+        }
+
+        /// <summary>
+        /// Enables or disables whether asynchronous dispatch should be used by the broker
+        /// </summary>
+        public bool DispatchAsync
+        {
+            get{ return this.dispatchAsync; }
+            set{ this.dispatchAsync = value; }
+        }
+
+        /// <summary>
+        /// Enables or disables exclusive consumers when using queues. An exclusive consumer means
+        /// only one instance of a consumer is allowed to process messages on a queue to preserve order
+        /// </summary>
+        public bool Exclusive
+        {
+            get{ return this.exclusive; }
+            set{ this.exclusive = value; }
+        }
+
+        /// <summary>
+        /// Enables or disables retroactive mode for consumers; i.e. do they go back in time or not?
+        /// </summary>
+        public bool Retroactive
+        {
+            get{ return this.retroactive; }
+            set{ this.retroactive = value; }
+        }
+
+        /// <summary>
+        /// Sets the default consumer priority for consumers
+        /// </summary>
+        public byte Priority
+        {
+            get{ return this.priority; }
+            set{ this.priority = value; }
+        }
+
+        public Connection Connection
+        {
+            get { return this.connection; }
+        }
+
+        public SessionId SessionId
+        {
+            get { return info.SessionId; }
+        }
+
+        public TransactionContext TransactionContext
+        {
+            get { return this.transactionContext; }
+        }
+
+        public TimeSpan RequestTimeout
+        {
+            get { return this.requestTimeout; }
+            set { this.requestTimeout = value; }
+        }
+
+        public bool Transacted
+        {
+            get { return this.AcknowledgementMode == AcknowledgementMode.Transactional; }
+        }
+
+        public AcknowledgementMode AcknowledgementMode
+        {
+            get { return this.acknowledgementMode; }
+        }
+
+        public bool IsClientAcknowledge
+        {
+            get { return this.acknowledgementMode == AcknowledgementMode.ClientAcknowledge; }
+        }
+
+        public bool IsAutoAcknowledge
+        {
+            get { return this.acknowledgementMode == AcknowledgementMode.AutoAcknowledge; }
+        }
+
+        public bool IsDupsOkAcknowledge
+        {
+            get { return this.acknowledgementMode == AcknowledgementMode.DupsOkAcknowledge; }
+        }
+
+        public bool IsIndividualAcknowledge
+        {
+            get { return this.acknowledgementMode == AcknowledgementMode.IndividualAcknowledge; }
+        }
+
+        public bool IsTransacted
+        {
+            get { return this.acknowledgementMode == AcknowledgementMode.Transactional; }
+        }
+
+        public SessionExecutor Executor
+        {
+            get { return this.executor; }
+        }
+
+        public long NextDeliveryId
+        {
+            get { return Interlocked.Increment(ref this.nextDeliveryId); }
+        }
+
+        #endregion
+
+        #region ISession Members
+
+        public void Dispose()
+        {
+            Dispose(true);
+            GC.SuppressFinalize(this);
+        }
+
+        protected void Dispose(bool disposing)
+        {
+            if(this.disposed)
+            {
+                return;
+            }
+
+            if(disposing)
+            {
+                // Dispose managed code here.
+            }
+
+            try
+            {
+                Close();
+            }
+            catch
+            {
+                // Ignore network errors.
+            }
+
+            this.disposed = true;
+        }
+
+        public void Close()
+        {
+            lock(myLock)
+            {
+                if(this.closed)
+                {
+                    return;
+                }
+
+                try
+                {
+                    Tracer.InfoFormat("Closing The Session with Id {0}", this.info.SessionId.ToString());
+                    DoClose();
+                    Tracer.InfoFormat("Closed The Session with Id {0}", this.info.SessionId.ToString());
+                }
+                catch(Exception ex)
+                {
+                    Tracer.ErrorFormat("Error during session close: {0}", ex);
+                }
+                finally
+                {
+                    // Make sure we attempt to inform the broker this Session is done.
+                    RemoveInfo info = new RemoveInfo();
+                    info.ObjectId = this.info.SessionId;
+                    info.LastDeliveredSequenceId = this.lastDeliveredSequenceId;
+                    this.connection.Oneway(info);
+                    this.connection = null;
+                    this.closed = true;
+                    this.closing = false;
+                }
+            }
+        }
+
+        internal void DoClose()
+        {
+            lock(myLock)
+            {
+                if(this.closed)
+                {
+                    return;
+                }
+
+                try
+                {
+                    this.closing = true;
+
+                    // Stop all message deliveries from this Session
+                    Stop();
+
+                    lock(consumers.SyncRoot)
+                    {
+                        foreach(MessageConsumer consumer in consumers.Values)
+                        {
+                            consumer.DoClose();
+                            this.lastDeliveredSequenceId =
+                                Math.Min(this.lastDeliveredSequenceId, consumer.LastDeliveredSequenceId);
+                        }
+                    }
+                    consumers.Clear();
+
+                    lock(producers.SyncRoot)
+                    {
+                        foreach(MessageProducer producer in producers.Values)
+                        {
+                            producer.DoClose();
+                        }
+                    }
+                    producers.Clear();
+
+                    // If in a transaction roll it back
+                    if(this.IsTransacted && this.transactionContext.InTransaction)
+                    {
+                        try
+                        {
+                            this.transactionContext.Rollback();
+                        }
+                        catch
+                        {
+                        }
+                    }
+
+                    Connection.RemoveSession(this);
+                }
+                catch(Exception ex)
+                {
+                    Tracer.ErrorFormat("Error during session close: {0}", ex);
+                }
+                finally
+                {
+                    this.closed = true;
+                    this.closing = false;
+                }
+            }
+        }
+
+        public IMessageProducer CreateProducer()
+        {
+            return CreateProducer(null);
+        }
+
+        public IMessageProducer CreateProducer(IDestination destination)
+        {
+            ProducerInfo command = CreateProducerInfo(destination);
+            ProducerId producerId = command.ProducerId;
+            MessageProducer producer = null;
+
+            try
+            {
+                producer = new MessageProducer(this, command);
+                producers[producerId] = producer;
+                this.connection.Oneway(command);
+            }
+            catch(Exception)
+            {
+                if(producer != null)
+                {
+                    producer.Close();
+                }
+
+                throw;
+            }
+
+            // Registered with Connection so it can process Producer Acks.
+            connection.addProducer(producerId, producer);
+
+            return producer;
+        }
+
+        public IMessageConsumer CreateConsumer(IDestination destination)
+        {
+            return CreateConsumer(destination, null, false);
+        }
+
+        public IMessageConsumer CreateConsumer(IDestination destination, string selector)
+        {
+            return CreateConsumer(destination, selector, false);
+        }
+
+        public IMessageConsumer CreateConsumer(IDestination destination, string selector, bool noLocal)
+        {
+            if (destination == null)
+            {
+                throw new InvalidDestinationException("Cannot create a Consumer with a Null destination");
+            }
+
+            ConsumerInfo command = CreateConsumerInfo(destination, selector);
+            command.NoLocal = noLocal;
+            ConsumerId consumerId = command.ConsumerId;
+            MessageConsumer consumer = null;
+
+            // Registered with Connection before we register at the broker.
+            connection.addDispatcher(consumerId, this);
+
+            try
+            {
+                consumer = new MessageConsumer(this, command);
+                // lets register the consumer first in case we start dispatching messages immediately
+                consumers[consumerId] = consumer;
+                this.Connection.SyncRequest(command);
+
+                if(this.Started)
+                {
+                    consumer.Start();
+                }
+
+                return consumer;
+            }
+            catch(Exception)
+            {
+                if(consumer != null)
+                {
+                    consumer.Close();
+                }
+
+                throw;
+            }
+        }
+
+        public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal)
+        {
+            if (destination == null)
+            {
+                throw new InvalidDestinationException("Cannot create a Consumer with a Null destination");
+            }
+
+            ConsumerInfo command = CreateConsumerInfo(destination, selector);
+            ConsumerId consumerId = command.ConsumerId;
+            command.SubscriptionName = name;
+            command.NoLocal = noLocal;
+            command.PrefetchSize = this.connection.PrefetchPolicy.DurableTopicPrefetch;
+            MessageConsumer consumer = null;
+
+            // Registered with Connection before we register at the broker.
+            connection.addDispatcher(consumerId, this);
+
+            try
+            {
+                consumer = new MessageConsumer(this, command);
+                // lets register the consumer first in case we start dispatching messages immediately
+                consumers[consumerId] = consumer;
+
+                if(this.Started)
+                {
+                    consumer.Start();
+                }
+
+                this.connection.SyncRequest(command);
+            }
+            catch(Exception)
+            {
+                if(consumer != null)
+                {
+                    consumer.Close();
+                }
+
+                throw;
+            }
+
+            return consumer;
+        }
+
+        public void DeleteDurableConsumer(string name)
+        {
+            RemoveSubscriptionInfo command = new RemoveSubscriptionInfo();
+            command.ConnectionId = Connection.ConnectionId;
+            command.ClientId = Connection.ClientId;
+            command.SubcriptionName = name;
+            this.connection.SyncRequest(command);
+        }
+
+        public IQueueBrowser CreateBrowser(IQueue queue)
+        {
+            throw new NotSupportedException("Not Yet Implemented");
+        }
+
+        public IQueueBrowser CreateBrowser(IQueue queue, string selector)
+        {
+            throw new NotSupportedException("Not Yet Implemented");
+        }
+
+        public IQueue GetQueue(string name)
+        {
+            return new ActiveMQQueue(name);
+        }
+
+        public ITopic GetTopic(string name)
+        {
+            return new ActiveMQTopic(name);
+        }
+
+        public ITemporaryQueue CreateTemporaryQueue()
+        {
+            ActiveMQTempQueue answer = new ActiveMQTempQueue(Connection.CreateTemporaryDestinationName());
+            CreateTemporaryDestination(answer);
+            return answer;
+        }
+
+        public ITemporaryTopic CreateTemporaryTopic()
+        {
+            ActiveMQTempTopic answer = new ActiveMQTempTopic(Connection.CreateTemporaryDestinationName());
+            CreateTemporaryDestination(answer);
+            return answer;
+        }
+
+        /// <summary>
+        /// Delete a destination (Queue, Topic, Temp Queue, Temp Topic).
+        /// </summary>
+        public void DeleteDestination(IDestination destination)
+        {
+            DestinationInfo command = new DestinationInfo();
+            command.ConnectionId = Connection.ConnectionId;
+            command.OperationType = DestinationInfo.REMOVE_OPERATION_TYPE; // 1 is remove
+            command.Destination = (Destination) destination;
+
+            this.connection.Oneway(command);
+        }
+
+        public IMessage CreateMessage()
+        {
+            Message answer = new Message();
+            return ConfigureMessage(answer) as IMessage;
+        }
+
+        public ITextMessage CreateTextMessage()
+        {
+            ActiveMQTextMessage answer = new ActiveMQTextMessage();
+            return ConfigureMessage(answer) as ITextMessage;
+        }
+
+        public ITextMessage CreateTextMessage(string text)
+        {
+            ActiveMQTextMessage answer = new ActiveMQTextMessage(text);
+            return ConfigureMessage(answer) as ITextMessage;
+        }
+
+        public IMapMessage CreateMapMessage()
+        {
+            return ConfigureMessage(new ActiveMQMapMessage()) as IMapMessage;
+        }
+
+        public IBytesMessage CreateBytesMessage()
+        {
+            return ConfigureMessage(new ActiveMQBytesMessage()) as IBytesMessage;
+        }
+
+        public IBytesMessage CreateBytesMessage(byte[] body)
+        {
+            BytesMessage answer = new BytesMessage();
+            answer.Content = body;
+            return ConfigureMessage(answer) as IBytesMessage;
+        }
+
+        public IStreamMessage CreateStreamMessage()
+        {
+            return ConfigureMessage(new ActiveMQStreamMessage()) as IStreamMessage;
+        }
+
+        public IObjectMessage CreateObjectMessage(object body)
+        {
+            throw NotSupportedException("No Object Message in Stomp");
+        }
+
+        public void Commit()
+        {
+            if(!Transacted)
+            {
+                throw new InvalidOperationException(
+                        "You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: "
+                        + this.AcknowledgementMode);
+            }
+
+            this.TransactionContext.Commit();
+        }
+
+        public void Rollback()
+        {
+            if(!Transacted)
+            {
+                throw new InvalidOperationException(
+                        "You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: "
+                        + this.AcknowledgementMode);
+            }
+
+            this.TransactionContext.Rollback();
+        }
+
+        #endregion
+
+        protected void CreateTemporaryDestination(Destination tempDestination)
+        {
+            DestinationInfo command = new DestinationInfo();
+            command.ConnectionId = Connection.ConnectionId;
+            command.OperationType = DestinationInfo.ADD_OPERATION_TYPE; // 0 is add
+            command.Destination = tempDestination;
+
+            this.connection.SyncRequest(command);
+        }
+
+        public void DoSend( Message message, MessageProducer producer, TimeSpan sendTimeout )
+        {
+            Message msg = message;
+
+            if(Transacted)
+            {
+                DoStartTransaction();
+                msg.TransactionId = TransactionContext.TransactionId;
+            }
+
+            msg.RedeliveryCounter = 0;
+            msg.BrokerPath = null;
+
+            if(this.connection.CopyMessageOnSend)
+            {
+                msg = (Message)msg.Clone();
+            }
+
+            msg.OnSend();
+            msg.ProducerId = msg.MessageId.ProducerId;
+
+            if(sendTimeout.TotalMilliseconds <= 0 && !msg.ResponseRequired && !connection.AlwaysSyncSend &&
+               (!msg.Persistent || connection.AsyncSend || msg.TransactionId != null))
+            {
+                this.connection.Oneway(msg);
+            }
+            else
+            {
+                if(sendTimeout.TotalMilliseconds > 0)
+                {
+                    this.connection.SyncRequest(msg, sendTimeout);
+                }
+                else
+                {
+                    this.connection.SyncRequest(msg);
+                }
+            }
+        }
+
+        /// <summary>
+        /// Ensures that a transaction is started
+        /// </summary>
+        public void DoStartTransaction()
+        {
+            if(Transacted)
+            {
+                this.TransactionContext.Begin();
+            }
+        }
+
+        public void DisposeOf(ConsumerId objectId, long lastDeliveredSequenceId)
+        {
+            connection.removeDispatcher(objectId);
+            this.lastDeliveredSequenceId = Math.Min(this.lastDeliveredSequenceId, lastDeliveredSequenceId);
+
+            if(!this.closing)
+            {
+                consumers.Remove(objectId);
+            }
+        }
+
+        public void DisposeOf(ProducerId objectId)
+        {
+            connection.removeProducer(objectId);
+            if(!this.closing)
+            {
+                producers.Remove(objectId);
+            }
+        }
+
+        protected virtual ConsumerInfo CreateConsumerInfo(IDestination destination, string selector)
+        {
+            ConsumerInfo answer = new ConsumerInfo();
+            ConsumerId id = new ConsumerId();
+            id.ConnectionId = info.SessionId.ConnectionId;
+            id.SessionId = info.SessionId.Value;
+            id.Value = Interlocked.Increment(ref consumerCounter);
+            answer.ConsumerId = id;
+            answer.Destination = Destination.Transform(destination);
+            answer.Selector = selector;
+            answer.Priority = this.Priority;
+            answer.Exclusive = this.Exclusive;
+            answer.DispatchAsync = this.DispatchAsync;
+            answer.Retroactive = this.Retroactive;
+            answer.MaximumPendingMessageLimit = this.connection.PrefetchPolicy.MaximumPendingMessageLimit;
+
+            if(destination is ITopic || destination is ITemporaryTopic)
+            {
+                answer.PrefetchSize = this.connection.PrefetchPolicy.TopicPrefetch;
+            }
+            else if(destination is IQueue || destination is ITemporaryQueue)
+            {
+                answer.PrefetchSize = this.connection.PrefetchPolicy.QueuePrefetch;
+            }
+
+            // If the destination contained a URI query, then use it to set public properties
+            // on the ConsumerInfo
+            Destination amqDestination = destination as Destination;
+            if(amqDestination != null && amqDestination.Options != null)
+            {
+                URISupport.SetProperties(answer, amqDestination.Options, "consumer.");
+            }
+
+            return answer;
+        }
+
+        protected virtual ProducerInfo CreateProducerInfo(IDestination destination)
+        {
+            ProducerInfo answer = new ProducerInfo();
+            ProducerId id = new ProducerId();
+            id.ConnectionId = info.SessionId.ConnectionId;
+            id.SessionId = info.SessionId.Value;
+            id.Value = Interlocked.Increment(ref producerCounter);
+            answer.ProducerId = id;
+            answer.Destination = Destination.Transform(destination);
+            answer.WindowSize = connection.ProducerWindowSize;
+
+            // If the destination contained a URI query, then use it to set public
+            // properties on the ProducerInfo
+            Destination amqDestination = destination as Destination;
+            if(amqDestination != null && amqDestination.Options != null)
+            {
+                URISupport.SetProperties(answer, amqDestination.Options, "producer.");
+            }
+
+            return answer;
+        }
+
+        public void Stop()
+        {
+            if(this.executor != null)
+            {
+                this.executor.Stop();
+            }
+        }
+
+        public void Start()
+        {
+            foreach(MessageConsumer consumer in this.consumers.Values)
+            {
+                consumer.Start();
+            }
+
+            if(this.executor != null)
+            {
+                this.executor.Start();
+            }
+        }
+
+        public bool Started
+        {
+            get
+            {
+                return this.executor != null ? this.executor.Running : false;
+            }
+        }
+
+        internal void Redispatch(MessageDispatchChannel channel)
+        {
+            MessageDispatch[] messages = channel.RemoveAll();
+            System.Array.Reverse(messages);
+
+            foreach(MessageDispatch message in messages)
+            {
+                if(Tracer.IsDebugEnabled)
+                {
+                    Tracer.DebugFormat("Resending Message Dispatch: ", message.ToString());
+                }
+                this.executor.ExecuteFirst(message);
+            }
+        }
+
+        public void Dispatch(MessageDispatch dispatch)
+        {
+            if(this.executor != null)
+            {
+                if(Tracer.IsDebugEnabled)
+                {
+                    Tracer.DebugFormat("Send Message Dispatch: ", dispatch.ToString());
+                }
+                this.executor.Execute(dispatch);
+            }
+        }
+
+        internal void ClearMessagesInProgress()
+        {
+            if( this.executor != null ) {
+                this.executor.ClearMessagesInProgress();
+            }
+
+            lock(this.consumers.SyncRoot)
+            {
+                foreach(MessageConsumer consumer in this.consumers)
+                {
+                    consumer.ClearMessagesInProgress();
+                }
+            }
+        }
+
+        internal void Acknowledge()
+        {
+            lock(this.consumers.SyncRoot)
+            {
+                foreach(MessageConsumer consumer in this.consumers.Values)
+                {
+                    consumer.Acknowledge();
+                }
+            }
+        }
+
+        private Message ConfigureMessage(Message message)
+        {
+            message.Connection = this.connection;
+
+            if(this.IsTransacted)
+            {
+                // Allows Acknowledge to be called in a transaction with no effect per JMS Spec.
+                message.Acknowledger += new AcknowledgeHandler(DoNothingAcknowledge);
+            }
+
+            return message;
+        }
+
+        /// <summary>
+        /// Prevents message from throwing an exception if a client calls Acknoweldge on
+        /// a message that is part of a transaction either being produced or consumed.  The
+        /// JMS Spec indicates that users should be able to call Acknowledge with no effect
+        /// if the message is in a transaction.
+        /// </summary>
+        /// <param name="message">
+        /// A <see cref="Message"/>
+        /// </param>
+        private void DoNothingAcknowledge(Message message)
+        {
+        }
+
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/SessionExecutor.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/SessionExecutor.cs?rev=887420&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/SessionExecutor.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/SessionExecutor.cs Fri Dec  4 22:53:41 2009
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections;
+using Apache.NMS.Stomp.Commands;
+using Apache.NMS.Stomp.Util;
+using Apache.NMS.Stomp.Threads;
+
+namespace Apache.NMS.Stomp
+{
+    public class SessionExecutor : Threads.Task
+    {
+        private MessageDispatchChannel messageQueue = new MessageDispatchChannel();
+        private TaskRunner taskRunner = null;
+
+        private Session session = null;
+        private IDictionary consumers = null;
+
+        public SessionExecutor(Session session, IDictionary consumers)
+        {
+            this.session = session;
+            this.consumers = consumers;
+        }
+
+        ~SessionExecutor()
+        {
+            try
+            {
+                Stop();
+                Close();
+                Clear();
+            }
+            catch
+            {
+            }
+        }
+
+        public void Execute(MessageDispatch dispatch)
+        {
+            // Add the data to the queue.
+            this.messageQueue.Enqueue(dispatch);
+            this.Wakeup();
+        }
+
+        public void ExecuteFirst(MessageDispatch dispatch)
+        {
+            // Add the data to the queue.
+            this.messageQueue.EnqueueFirst(dispatch);
+            this.Wakeup();
+        }
+
+        public void Wakeup()
+        {
+            TaskRunner taskRunner = this.taskRunner;
+
+            lock(messageQueue.SyncRoot)
+            {
+                if(this.taskRunner == null)
+                {
+                    this.taskRunner = new DedicatedTaskRunner(this);
+                }
+
+                taskRunner = this.taskRunner;
+            }
+
+            taskRunner.Wakeup();
+        }
+
+        public void Start()
+        {
+            if(!messageQueue.Running)
+            {
+                messageQueue.Start();
+
+                if(HasUncomsumedMessages)
+                {
+                    this.Wakeup();
+                }
+            }
+        }
+
+        public void Stop()
+        {
+            if(messageQueue.Running)
+            {
+                messageQueue.Stop();
+                TaskRunner taskRunner = this.taskRunner;
+
+                if(taskRunner != null)
+                {
+                    this.taskRunner = null;
+                    taskRunner.Shutdown();
+                }
+            }
+        }
+
+        public void Close()
+        {
+            this.messageQueue.Close();
+        }
+
+        public void Dispatch(MessageDispatch dispatch)
+        {
+            try
+            {
+                MessageConsumer consumer = null;
+
+                lock(this.consumers.SyncRoot)
+                {
+                    if(this.consumers.Contains(dispatch.ConsumerId))
+                    {
+                        consumer = this.consumers[dispatch.ConsumerId] as MessageConsumer;
+                    }
+
+                    // If the consumer is not available, just ignore the message.
+                    // Otherwise, dispatch the message to the consumer.
+                    if(consumer != null)
+                    {
+                        consumer.Dispatch(dispatch);
+                    }
+                }
+
+            }
+            catch(Exception ex)
+            {
+                Tracer.DebugFormat("Caught Exception While Dispatching: {0}", ex.Message );
+            }
+        }
+
+        public bool Iterate()
+        {
+            try
+            {
+                lock(this.consumers.SyncRoot)
+                {
+                    // Deliver any messages queued on the consumer to their listeners.
+                    foreach( MessageConsumer consumer in this.consumers.Values )
+                    {
+                        if(consumer.Iterate())
+                        {
+                            return true;
+                        }
+                    }
+                }
+
+                // No messages left queued on the listeners.. so now dispatch messages
+                // queued on the session
+                MessageDispatch message = messageQueue.DequeueNoWait();
+
+                if(message != null)
+                {
+                    this.Dispatch(message);
+                    return !messageQueue.Empty;
+                }
+
+                return false;
+            }
+            catch(Exception ex)
+            {
+                Tracer.DebugFormat("Caught Exception While Dispatching: {0}", ex.Message );
+                this.session.Connection.OnSessionException(this.session, ex);
+            }
+
+            return true;
+        }
+
+        public void ClearMessagesInProgress()
+        {
+            this.messageQueue.Clear();
+        }
+
+        public void Clear()
+        {
+            this.messageQueue.Clear();
+        }
+
+        public MessageDispatch[] UnconsumedMessages
+        {
+            get{ return messageQueue.RemoveAll(); }
+        }
+
+        public bool HasUncomsumedMessages
+        {
+            get{ return !messageQueue.Closed && messageQueue.Running && !messageQueue.Empty; }
+        }
+
+        public bool Running
+        {
+            get{ return this.messageQueue.Running; }
+        }
+
+        public bool Empty
+        {
+            get{ return this.messageQueue.Empty; }
+        }
+
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/SessionExecutor.cs
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message