activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jgo...@apache.org
Subject svn commit: r990885 [2/3] - in /activemq/activemq-dotnet: Apache.NMS.ActiveMQ/trunk/src/main/csharp/ Apache.NMS.EMS/trunk/src/main/csharp/ Apache.NMS/trunk/src/main/csharp/
Date Mon, 30 Aug 2010 18:04:21 GMT
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs?rev=990885&r1=990884&r2=990885&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs Mon Aug 30 18:04:21 2010
@@ -24,874 +24,884 @@ using Apache.NMS.Util;
 
 namespace Apache.NMS.ActiveMQ
 {
-    /// <summary>
-    /// Default provider of ISession
-    /// </summary>
-    public class Session : ISession, IDispatcher
-    {
-        /// <summary>
-        /// Private object used for synchronization, instead of public "this"
-        /// </summary>
-        private readonly object myLock = new object();
-
-        private readonly IDictionary consumers = Hashtable.Synchronized(new Hashtable());
-        private readonly IDictionary producers = Hashtable.Synchronized(new Hashtable());
-
-        private SessionExecutor executor;
-        private TransactionContext transactionContext;
-        private Connection connection;
-
-        private bool dispatchAsync;
-        private bool exclusive;
-        private bool retroactive;
-        private byte priority = 0;
-
-        private readonly SessionInfo info;
-        private int consumerCounter;
-        private int producerCounter;
-        private long nextDeliveryId;
-        private long lastDeliveredSequenceId;
-        private bool disposed = false;
-        private bool closed = false;
-        private bool closing = false;
-        private TimeSpan disposeStopTimeout = TimeSpan.FromMilliseconds(30000);
-        private TimeSpan closeStopTimeout = TimeSpan.FromMilliseconds(Timeout.Infinite);
+	/// <summary>
+	/// Default provider of ISession
+	/// </summary>
+	public class Session : ISession, IDispatcher
+	{
+		/// <summary>
+		/// Private object used for synchronization, instead of public "this"
+		/// </summary>
+		private readonly object myLock = new object();
+
+		private readonly IDictionary consumers = Hashtable.Synchronized(new Hashtable());
+		private readonly IDictionary producers = Hashtable.Synchronized(new Hashtable());
+
+		private SessionExecutor executor;
+		private TransactionContext transactionContext;
+		private Connection connection;
+
+		private bool dispatchAsync;
+		private bool exclusive;
+		private bool retroactive;
+		private byte priority = 0;
+
+		private readonly SessionInfo info;
+		private int consumerCounter;
+		private int producerCounter;
+		private long nextDeliveryId;
+		private long lastDeliveredSequenceId;
+		private bool disposed = false;
+		private bool closed = false;
+		private bool closing = false;
+		private TimeSpan disposeStopTimeout = TimeSpan.FromMilliseconds(30000);
+		private TimeSpan closeStopTimeout = TimeSpan.FromMilliseconds(Timeout.Infinite);
 		private TimeSpan requestTimeout;
-        private AcknowledgementMode acknowledgementMode;
+		private AcknowledgementMode acknowledgementMode;
+
+		public Session(Connection connection, SessionInfo info, AcknowledgementMode acknowledgementMode, bool dispatchAsync)
+		{
+			this.connection = connection;
+			this.info = info;
+			this.acknowledgementMode = acknowledgementMode;
+			this.requestTimeout = connection.RequestTimeout;
+			this.dispatchAsync = dispatchAsync;
+
+			if(acknowledgementMode == AcknowledgementMode.Transactional)
+			{
+				this.transactionContext = new TransactionContext(this);
+			}
+
+			this.executor = new SessionExecutor(this, this.consumers);
+		}
+
+		~Session()
+		{
+			Dispose(false);
+		}
+
+		#region Property Accessors
+
+		/// <summary>
+		/// Sets the prefetch size, the maximum number of messages a broker will dispatch to consumers
+		/// until acknowledgements are received.
+		/// </summary>
+		public int PrefetchSize
+		{
+			set{ this.connection.PrefetchPolicy.SetAll(value); }
+		}
+
+		/// <summary>
+		/// Sets the maximum number of messages to keep around per consumer
+		/// in addition to the prefetch window for non-durable topics until messages
+		/// will start to be evicted for slow consumers.
+		/// Must be > 0 to enable this feature
+		/// </summary>
+		public int MaximumPendingMessageLimit
+		{
+			set{ this.connection.PrefetchPolicy.MaximumPendingMessageLimit = value; }
+		}
+
+		/// <summary>
+		/// Enables or disables whether asynchronous dispatch should be used by the broker
+		/// </summary>
+		public bool DispatchAsync
+		{
+			get{ return this.dispatchAsync; }
+			set{ this.dispatchAsync = value; }
+		}
+
+		/// <summary>
+		/// Enables or disables exclusive consumers when using queues. An exclusive consumer means
+		/// only one instance of a consumer is allowed to process messages on a queue to preserve order
+		/// </summary>
+		public bool Exclusive
+		{
+			get{ return this.exclusive; }
+			set{ this.exclusive = value; }
+		}
+
+		/// <summary>
+		/// Enables or disables retroactive mode for consumers; i.e. do they go back in time or not?
+		/// </summary>
+		public bool Retroactive
+		{
+			get{ return this.retroactive; }
+			set{ this.retroactive = value; }
+		}
+
+		/// <summary>
+		/// Sets the default consumer priority for consumers
+		/// </summary>
+		public byte Priority
+		{
+			get{ return this.priority; }
+			set{ this.priority = value; }
+		}
+
+		public Connection Connection
+		{
+			get { return this.connection; }
+		}
+
+		public SessionId SessionId
+		{
+			get { return info.SessionId; }
+		}
+
+		public TransactionContext TransactionContext
+		{
+			get { return this.transactionContext; }
+		}
+
+		public TimeSpan RequestTimeout
+		{
+			get { return this.requestTimeout; }
+			set { this.requestTimeout = value; }
+		}
+
+		public bool Transacted
+		{
+			get { return this.AcknowledgementMode == AcknowledgementMode.Transactional; }
+		}
+
+		public AcknowledgementMode AcknowledgementMode
+		{
+			get { return this.acknowledgementMode; }
+		}
+
+		public bool IsClientAcknowledge
+		{
+			get { return this.acknowledgementMode == AcknowledgementMode.ClientAcknowledge; }
+		}
+
+		public bool IsAutoAcknowledge
+		{
+			get { return this.acknowledgementMode == AcknowledgementMode.AutoAcknowledge; }
+		}
+
+		public bool IsDupsOkAcknowledge
+		{
+			get { return this.acknowledgementMode == AcknowledgementMode.DupsOkAcknowledge; }
+		}
+
+		public bool IsIndividualAcknowledge
+		{
+			get { return this.acknowledgementMode == AcknowledgementMode.IndividualAcknowledge; }
+		}
+
+		public bool IsTransacted
+		{
+			get { return this.acknowledgementMode == AcknowledgementMode.Transactional; }
+		}
+
+		public SessionExecutor Executor
+		{
+			get { return this.executor; }
+		}
+
+		public long NextDeliveryId
+		{
+			get { return Interlocked.Increment(ref this.nextDeliveryId); }
+		}
+
+		public long DisposeStopTimeout
+		{
+			get { return (long) this.disposeStopTimeout.TotalMilliseconds; }
+			set { this.disposeStopTimeout = TimeSpan.FromMilliseconds(value); }
+		}
+
+		public long CloseStopTimeout
+		{
+			get { return (long) this.closeStopTimeout.TotalMilliseconds; }
+			set { this.closeStopTimeout = TimeSpan.FromMilliseconds(value); }
+		}
+
+		private ConsumerTransformerDelegate consumerTransformer;
+		/// <summary>
+		/// A Delegate that is called each time a Message is dispatched to allow the client to do
+		/// any necessary transformations on the received message before it is delivered.
+		/// The Session instance sets the delegate on each Consumer it creates.
+		/// </summary>
+		public ConsumerTransformerDelegate ConsumerTransformer
+		{
+			get { return this.consumerTransformer; }
+			set { this.consumerTransformer = value; }
+		}
+
+		private ProducerTransformerDelegate producerTransformer;
+		/// <summary>
+		/// A delegate that is called each time a Message is sent from this Producer which allows
+		/// the application to perform any needed transformations on the Message before it is sent.
+		/// The Session instance sets the delegate on each Producer it creates.
+		/// </summary>
+		public ProducerTransformerDelegate ProducerTransformer
+		{
+			get { return this.producerTransformer; }
+			set { this.producerTransformer = value; }
+		}
+
+		#endregion
+
+		#region ISession Members
+
+		public void Dispose()
+		{
+			Dispose(true);
+			GC.SuppressFinalize(this);
+		}
+
+		protected void Dispose(bool disposing)
+		{
+			if(this.disposed)
+			{
+				return;
+			}
+
+			if(disposing)
+			{
+				// Dispose managed code here.
+			}
+
+			try
+			{
+				// Force a Stop when we are Disposing vs a Normal Close.
+				this.executor.Stop(this.disposeStopTimeout);
+
+				Close();
+			}
+			catch
+			{
+				// Ignore network errors.
+			}
+
+			this.disposed = true;
+		}
+
+		public void Close()
+		{
+			lock(myLock)
+			{
+				if(this.closed)
+				{
+					return;
+				}
+
+				try
+				{
+					Tracer.InfoFormat("Closing The Session with Id {0}", this.info.SessionId.ToString());
+					DoClose();
+					Tracer.InfoFormat("Closed The Session with Id {0}", this.info.SessionId.ToString());
+				}
+				catch(Exception ex)
+				{
+					Tracer.ErrorFormat("Error during session close: {0}", ex);
+				}
+				finally
+				{
+					// Make sure we attempt to inform the broker this Session is done.
+					RemoveInfo info = new RemoveInfo();
+					info.ObjectId = this.info.SessionId;
+					info.LastDeliveredSequenceId = this.lastDeliveredSequenceId;
+					this.connection.Oneway(info);
+					this.connection = null;
+					this.closed = true;
+					this.closing = false;
+				}
+			}
+		}
+
+		internal void DoClose()
+		{
+			lock(myLock)
+			{
+				if(this.closed)
+				{
+					return;
+				}
+
+				try
+				{
+					this.closing = true;
+
+					// Stop all message deliveries from this Session
+					this.executor.Stop(this.closeStopTimeout);
+
+					lock(consumers.SyncRoot)
+					{
+						foreach(MessageConsumer consumer in consumers.Values)
+						{
+							consumer.DoClose();
+							this.lastDeliveredSequenceId =
+								Math.Min(this.lastDeliveredSequenceId, consumer.LastDeliveredSequenceId);
+						}
+					}
+					consumers.Clear();
+
+					lock(producers.SyncRoot)
+					{
+						foreach(MessageProducer producer in producers.Values)
+						{
+							producer.DoClose();
+						}
+					}
+					producers.Clear();
+
+					// If in a transaction roll it back
+					if(this.IsTransacted && this.transactionContext.InTransaction)
+					{
+						try
+						{
+							this.transactionContext.Rollback();
+						}
+						catch
+						{
+						}
+					}
+
+					Connection.RemoveSession(this);
+				}
+				catch(Exception ex)
+				{
+					Tracer.ErrorFormat("Error during session close: {0}", ex);
+				}
+				finally
+				{
+					this.closed = true;
+					this.closing = false;
+				}
+			}
+		}
+
+		public IMessageProducer CreateProducer()
+		{
+			return CreateProducer(null);
+		}
+
+		public IMessageProducer CreateProducer(IDestination destination)
+		{
+			MessageProducer producer = null;
+
+			try
+			{
+				ActiveMQDestination dest = null;
+				if(destination != null)
+				{
+					dest = ActiveMQDestination.Transform(destination);
+				}
+
+				producer = new MessageProducer(this, GetNextProducerId(), dest, this.RequestTimeout);
+
+				producer.ProducerTransformer = this.ProducerTransformer;
+
+				this.AddProducer(producer);
+				this.Connection.Oneway(producer.ProducerInfo);
+			}
+			catch(Exception)
+			{
+				if(producer != null)
+				{
+					this.RemoveProducer(producer.ProducerId);
+					producer.Close();
+				}
+
+				throw;
+			}
+
+			return producer;
+		}
+
+		public IMessageConsumer CreateConsumer(IDestination destination)
+		{
+			return CreateConsumer(destination, null, false);
+		}
+
+		public IMessageConsumer CreateConsumer(IDestination destination, string selector)
+		{
+			return CreateConsumer(destination, selector, false);
+		}
+
+		public IMessageConsumer CreateConsumer(IDestination destination, string selector, bool noLocal)
+		{
+			if(destination == null)
+			{
+				throw new InvalidDestinationException("Cannot create a Consumer with a Null destination");
+			}
+
+			ActiveMQDestination dest = ActiveMQDestination.Transform(destination);
+			int prefetchSize = this.Connection.PrefetchPolicy.DurableTopicPrefetch;
+
+			if(dest is ITopic || dest is ITemporaryTopic)
+			{
+				prefetchSize = this.connection.PrefetchPolicy.TopicPrefetch;
+			}
+			else if(dest is IQueue || dest is ITemporaryQueue)
+			{
+				prefetchSize = this.connection.PrefetchPolicy.QueuePrefetch;
+			}
+
+			MessageConsumer consumer = null;
+
+			try
+			{
+				consumer = new MessageConsumer(this, GetNextConsumerId(), dest, null, selector, prefetchSize,
+											   this.connection.PrefetchPolicy.MaximumPendingMessageLimit,
+											   noLocal, false, this.connection.DispatchAsync);
+
+				consumer.ConsumerTransformer = this.ConsumerTransformer;
+
+				this.AddConsumer(consumer);
+				this.Connection.SyncRequest(consumer.ConsumerInfo);
+
+				if(this.Connection.IsStarted)
+				{
+					consumer.Start();
+				}
+			}
+			catch(Exception)
+			{
+				if(consumer != null)
+				{
+					this.RemoveConsumer(consumer.ConsumerId);
+					consumer.Close();
+				}
+
+				throw;
+			}
+
+			return consumer;
+		}
+
+		public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal)
+		{
+			if(destination == null)
+			{
+				throw new InvalidDestinationException("Cannot create a Consumer with a Null destination");
+			}
+
+			ActiveMQDestination dest = ActiveMQDestination.Transform(destination);
+			MessageConsumer consumer = null;
+
+			try
+			{
+				consumer = new MessageConsumer(this, GetNextConsumerId(), dest, name, selector,
+											   this.connection.PrefetchPolicy.DurableTopicPrefetch,
+											   this.connection.PrefetchPolicy.MaximumPendingMessageLimit,
+											   noLocal, false, this.connection.DispatchAsync);
 
-        public Session(Connection connection, SessionInfo info, AcknowledgementMode acknowledgementMode, bool dispatchAsync)
-        {
-            this.connection = connection;
-            this.info = info;
-            this.acknowledgementMode = acknowledgementMode;
-            this.requestTimeout = connection.RequestTimeout;
-            this.dispatchAsync = dispatchAsync;
-
-            if(acknowledgementMode == AcknowledgementMode.Transactional)
-            {
-                this.transactionContext = new TransactionContext(this);
-            }
-
-            this.executor = new SessionExecutor(this, this.consumers);
-        }
-
-        ~Session()
-        {
-            Dispose(false);
-        }
-
-        #region Property Accessors
-
-        /// <summary>
-        /// Sets the prefetch size, the maximum number of messages a broker will dispatch to consumers
-        /// until acknowledgements are received.
-        /// </summary>
-        public int PrefetchSize
-        {
-            set{ this.connection.PrefetchPolicy.SetAll(value); }
-        }
-
-        /// <summary>
-        /// Sets the maximum number of messages to keep around per consumer
-        /// in addition to the prefetch window for non-durable topics until messages
-        /// will start to be evicted for slow consumers.
-        /// Must be > 0 to enable this feature
-        /// </summary>
-        public int MaximumPendingMessageLimit
-        {
-            set{ this.connection.PrefetchPolicy.MaximumPendingMessageLimit = value; }
-        }
-
-        /// <summary>
-        /// Enables or disables whether asynchronous dispatch should be used by the broker
-        /// </summary>
-        public bool DispatchAsync
-        {
-            get{ return this.dispatchAsync; }
-            set{ this.dispatchAsync = value; }
-        }
-
-        /// <summary>
-        /// Enables or disables exclusive consumers when using queues. An exclusive consumer means
-        /// only one instance of a consumer is allowed to process messages on a queue to preserve order
-        /// </summary>
-        public bool Exclusive
-        {
-            get{ return this.exclusive; }
-            set{ this.exclusive = value; }
-        }
-
-        /// <summary>
-        /// Enables or disables retroactive mode for consumers; i.e. do they go back in time or not?
-        /// </summary>
-        public bool Retroactive
-        {
-            get{ return this.retroactive; }
-            set{ this.retroactive = value; }
-        }
-
-        /// <summary>
-        /// Sets the default consumer priority for consumers
-        /// </summary>
-        public byte Priority
-        {
-            get{ return this.priority; }
-            set{ this.priority = value; }
-        }
-
-        public Connection Connection
-        {
-            get { return this.connection; }
-        }
-
-        public SessionId SessionId
-        {
-            get { return info.SessionId; }
-        }
-
-        public TransactionContext TransactionContext
-        {
-            get { return this.transactionContext; }
-        }
-
-        public TimeSpan RequestTimeout
-        {
-            get { return this.requestTimeout; }
-            set { this.requestTimeout = value; }
-        }
-
-        public bool Transacted
-        {
-            get { return this.AcknowledgementMode == AcknowledgementMode.Transactional; }
-        }
-
-        public AcknowledgementMode AcknowledgementMode
-        {
-            get { return this.acknowledgementMode; }
-        }
-
-        public bool IsClientAcknowledge
-        {
-            get { return this.acknowledgementMode == AcknowledgementMode.ClientAcknowledge; }
-        }
-
-        public bool IsAutoAcknowledge
-        {
-            get { return this.acknowledgementMode == AcknowledgementMode.AutoAcknowledge; }
-        }
-
-        public bool IsDupsOkAcknowledge
-        {
-            get { return this.acknowledgementMode == AcknowledgementMode.DupsOkAcknowledge; }
-        }
-
-        public bool IsIndividualAcknowledge
-        {
-            get { return this.acknowledgementMode == AcknowledgementMode.IndividualAcknowledge; }
-        }
-
-        public bool IsTransacted
-        {
-            get { return this.acknowledgementMode == AcknowledgementMode.Transactional; }
-        }
-
-        public SessionExecutor Executor
-        {
-            get { return this.executor; }
-        }
-
-        public long NextDeliveryId
-        {
-            get { return Interlocked.Increment(ref this.nextDeliveryId); }
-        }
-
-        public long DisposeStopTimeout
-        {
-            get { return (long) this.disposeStopTimeout.TotalMilliseconds; }
-            set { this.disposeStopTimeout = TimeSpan.FromMilliseconds(value); }
-        }
-
-        public long CloseStopTimeout
-        {
-            get { return (long) this.closeStopTimeout.TotalMilliseconds; }
-            set { this.closeStopTimeout = TimeSpan.FromMilliseconds(value); }
-        }
-
-        private ConsumerTransformerDelegate consumerTransformer;
-        public ConsumerTransformerDelegate ConsumerTransformer
-        {
-            get { return this.consumerTransformer; }
-            set { this.consumerTransformer = value; }
-        }
-
-        private ProducerTransformerDelegate producerTransformer;
-        public ProducerTransformerDelegate ProducerTransformer
-        {
-            get { return this.producerTransformer; }
-            set { this.producerTransformer = value; }
-        }
-
-        #endregion
-
-        #region ISession Members
-
-        public void Dispose()
-        {
-            Dispose(true);
-            GC.SuppressFinalize(this);
-        }
-
-        protected void Dispose(bool disposing)
-        {
-            if(this.disposed)
-            {
-                return;
-            }
-
-            if(disposing)
-            {
-                // Dispose managed code here.
-            }
-
-            try
-            {
-                // Force a Stop when we are Disposing vs a Normal Close.
-                this.executor.Stop(this.disposeStopTimeout);
-
-                Close();
-            }
-            catch
-            {
-                // Ignore network errors.
-            }
-
-            this.disposed = true;
-        }
-
-        public void Close()
-        {
-            lock(myLock)
-            {
-                if(this.closed)
-                {
-                    return;
-                }
-
-                try
-                {
-                    Tracer.InfoFormat("Closing The Session with Id {0}", this.info.SessionId.ToString());
-                    DoClose();
-                    Tracer.InfoFormat("Closed The Session with Id {0}", this.info.SessionId.ToString());
-                }
-                catch(Exception ex)
-                {
-                    Tracer.ErrorFormat("Error during session close: {0}", ex);
-                }
-                finally
-                {
-                    // Make sure we attempt to inform the broker this Session is done.
-                    RemoveInfo info = new RemoveInfo();
-                    info.ObjectId = this.info.SessionId;
-                    info.LastDeliveredSequenceId = this.lastDeliveredSequenceId;
-                    this.connection.Oneway(info);
-                    this.connection = null;
-                    this.closed = true;
-                    this.closing = false;
-                }
-            }
-        }
-
-        internal void DoClose()
-        {
-            lock(myLock)
-            {
-                if(this.closed)
-                {
-                    return;
-                }
-
-                try
-                {
-                    this.closing = true;
-
-                    // Stop all message deliveries from this Session
-                    this.executor.Stop(this.closeStopTimeout);
-
-                    lock(consumers.SyncRoot)
-                    {
-                        foreach(MessageConsumer consumer in consumers.Values)
-                        {
-                            consumer.DoClose();
-                            this.lastDeliveredSequenceId =
-                                Math.Min(this.lastDeliveredSequenceId, consumer.LastDeliveredSequenceId);
-                        }
-                    }
-                    consumers.Clear();
-
-                    lock(producers.SyncRoot)
-                    {
-                        foreach(MessageProducer producer in producers.Values)
-                        {
-                            producer.DoClose();
-                        }
-                    }
-                    producers.Clear();
-
-                    // If in a transaction roll it back
-                    if(this.IsTransacted && this.transactionContext.InTransaction)
-                    {
-                        try
-                        {
-                            this.transactionContext.Rollback();
-                        }
-                        catch
-                        {
-                        }
-                    }
-
-                    Connection.RemoveSession(this);
-                }
-                catch(Exception ex)
-                {
-                    Tracer.ErrorFormat("Error during session close: {0}", ex);
-                }
-                finally
-                {
-                    this.closed = true;
-                    this.closing = false;
-                }
-            }
-        }
-
-        public IMessageProducer CreateProducer()
-        {
-            return CreateProducer(null);
-        }
-
-        public IMessageProducer CreateProducer(IDestination destination)
-        {
-            MessageProducer producer = null;
-
-            try
-            {
-                ActiveMQDestination dest = null;
-                if(destination != null)
-                {
-                    dest = ActiveMQDestination.Transform(destination);
-                }
-
-                producer = new MessageProducer(this, GetNextProducerId(), dest, this.RequestTimeout);
-
-                producer.ProducerTransformer = this.ProducerTransformer;
-
-                this.AddProducer(producer);
-                this.Connection.Oneway(producer.ProducerInfo);
-            }
-            catch(Exception)
-            {
-                if(producer != null)
-                {
-                    this.RemoveProducer(producer.ProducerId);
-                    producer.Close();
-                }
-
-                throw;
-            }
-
-            return producer;
-        }
-
-        public IMessageConsumer CreateConsumer(IDestination destination)
-        {
-            return CreateConsumer(destination, null, false);
-        }
-
-        public IMessageConsumer CreateConsumer(IDestination destination, string selector)
-        {
-            return CreateConsumer(destination, selector, false);
-        }
-
-        public IMessageConsumer CreateConsumer(IDestination destination, string selector, bool noLocal)
-        {
-            if(destination == null)
-            {
-                throw new InvalidDestinationException("Cannot create a Consumer with a Null destination");
-            }
-
-            ActiveMQDestination dest = ActiveMQDestination.Transform(destination);
-            int prefetchSize = this.Connection.PrefetchPolicy.DurableTopicPrefetch;
-
-            if(dest is ITopic || dest is ITemporaryTopic)
-            {
-                prefetchSize = this.connection.PrefetchPolicy.TopicPrefetch;
-            }
-            else if(dest is IQueue || dest is ITemporaryQueue)
-            {
-                prefetchSize = this.connection.PrefetchPolicy.QueuePrefetch;
-            }
-
-            MessageConsumer consumer = null;
-
-            try
-            {
-                consumer = new MessageConsumer(this, GetNextConsumerId(), dest, null, selector, prefetchSize,
-                                               this.connection.PrefetchPolicy.MaximumPendingMessageLimit,
-                                               noLocal, false, this.connection.DispatchAsync);
-				
 				consumer.ConsumerTransformer = this.ConsumerTransformer;
 
-                this.AddConsumer(consumer);
-                this.Connection.SyncRequest(consumer.ConsumerInfo);
+				this.AddConsumer(consumer);
+				this.Connection.SyncRequest(consumer.ConsumerInfo);
 
-                if(this.Connection.IsStarted)
-                {
-                    consumer.Start();
-                }
-            }
-            catch(Exception)
-            {
-                if(consumer != null)
-                {
-                    this.RemoveConsumer(consumer.ConsumerId);
-                    consumer.Close();
-                }
-
-                throw;
-            }
-
-            return consumer;
-        }
-
-        public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal)
-        {
-            if(destination == null)
-            {
-                throw new InvalidDestinationException("Cannot create a Consumer with a Null destination");
-            }
-
-            ActiveMQDestination dest = ActiveMQDestination.Transform(destination);
-            MessageConsumer consumer = null;
-
-            try
-            {
-                consumer = new MessageConsumer(this, GetNextConsumerId(), dest, name, selector,
-                                               this.connection.PrefetchPolicy.DurableTopicPrefetch,
-                                               this.connection.PrefetchPolicy.MaximumPendingMessageLimit,
-                                               noLocal, false, this.connection.DispatchAsync);
-
-                consumer.ConsumerTransformer = this.ConsumerTransformer;
-			
-                this.AddConsumer(consumer);
-                this.Connection.SyncRequest(consumer.ConsumerInfo);
-
-                if(this.Connection.IsStarted)
-                {
-                    consumer.Start();
-                }
-            }
-            catch(Exception)
-            {
-                if(consumer != null)
-                {
-                    this.RemoveConsumer(consumer.ConsumerId);
-                    consumer.Close();
-                }
-
-                throw;
-            }
-
-            return consumer;
-        }
-
-        public void DeleteDurableConsumer(string name)
-        {
-            RemoveSubscriptionInfo command = new RemoveSubscriptionInfo();
-            command.ConnectionId = Connection.ConnectionId;
-            command.ClientId = Connection.ClientId;
-            command.SubcriptionName = name;
-            this.connection.SyncRequest(command);
-        }
-
-        public IQueueBrowser CreateBrowser(IQueue queue)
-        {
-            return this.CreateBrowser(queue, null);
-        }
-
-        public IQueueBrowser CreateBrowser(IQueue queue, string selector)
-        {
-            if(queue == null)
-            {
-                throw new InvalidDestinationException("Cannot create a Consumer with a Null destination");
-            }
-
-            ActiveMQDestination dest = ActiveMQDestination.Transform(queue);
-            QueueBrowser browser = null;
-
-            try
-            {
-                browser = new QueueBrowser(this, GetNextConsumerId(), dest, selector, this.DispatchAsync);
-            }
-            catch(Exception)
-            {
-                if(browser != null)
-                {
-                    browser.Close();
-                }
-
-                throw;
-            }
-
-            return browser;
-        }
-
-        public IQueue GetQueue(string name)
-        {
-            return new ActiveMQQueue(name);
-        }
-
-        public ITopic GetTopic(string name)
-        {
-            return new ActiveMQTopic(name);
-        }
-
-        public ITemporaryQueue CreateTemporaryQueue()
-        {
-            return (ITemporaryQueue)this.connection.CreateTemporaryDestination(false);
-        }
-
-        public ITemporaryTopic CreateTemporaryTopic()
-        {
-            return (ITemporaryTopic)this.connection.CreateTemporaryDestination(true);
-        }
-
-        /// <summary>
-        /// Delete a destination (Queue, Topic, Temp Queue, Temp Topic).
-        /// </summary>
-        public void DeleteDestination(IDestination destination)
-        {
-            this.connection.DeleteDestination(destination);
-        }
-
-        public IMessage CreateMessage()
-        {
-            ActiveMQMessage answer = new ActiveMQMessage();
-            return ConfigureMessage(answer) as IMessage;
-        }
-
-        public ITextMessage CreateTextMessage()
-        {
-            ActiveMQTextMessage answer = new ActiveMQTextMessage();
-            return ConfigureMessage(answer) as ITextMessage;
-        }
-
-        public ITextMessage CreateTextMessage(string text)
-        {
-            ActiveMQTextMessage answer = new ActiveMQTextMessage(text);
-            return ConfigureMessage(answer) as ITextMessage;
-        }
-
-        public IMapMessage CreateMapMessage()
-        {
-            return ConfigureMessage(new ActiveMQMapMessage()) as IMapMessage;
-        }
-
-        public IBytesMessage CreateBytesMessage()
-        {
-            return ConfigureMessage(new ActiveMQBytesMessage()) as IBytesMessage;
-        }
-
-        public IBytesMessage CreateBytesMessage(byte[] body)
-        {
-            ActiveMQBytesMessage answer = new ActiveMQBytesMessage();
-            answer.Content = body;
-            return ConfigureMessage(answer) as IBytesMessage;
-        }
-
-        public IStreamMessage CreateStreamMessage()
-        {
-            return ConfigureMessage(new ActiveMQStreamMessage()) as IStreamMessage;
-        }
-
-        public IObjectMessage CreateObjectMessage(object body)
-        {
-            ActiveMQObjectMessage answer = new ActiveMQObjectMessage();
-            answer.Body = body;
-            return ConfigureMessage(answer) as IObjectMessage;
-        }
-
-        public void Commit()
-        {
-            if(!Transacted)
-            {
-                throw new InvalidOperationException(
-                        "You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: "
-                        + this.AcknowledgementMode);
-            }
-
-            this.TransactionContext.Commit();
-        }
-
-        public void Rollback()
-        {
-            if(!Transacted)
-            {
-                throw new InvalidOperationException(
-                        "You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: "
-                        + this.AcknowledgementMode);
-            }
-
-            this.TransactionContext.Rollback();
-        }
-
-        #endregion
-
-        public void DoSend( ActiveMQMessage message, MessageProducer producer, MemoryUsage producerWindow, TimeSpan sendTimeout )
-        {
-            ActiveMQMessage msg = message;
-
-            if(Transacted)
-            {
-                DoStartTransaction();
-                msg.TransactionId = TransactionContext.TransactionId;
-            }
-
-            msg.RedeliveryCounter = 0;
-            msg.BrokerPath = null;
-
-            if(this.connection.CopyMessageOnSend)
-            {
-                msg = (ActiveMQMessage)msg.Clone();
-            }
-
-            msg.OnSend();
-            msg.ProducerId = msg.MessageId.ProducerId;
-
-            if(sendTimeout.TotalMilliseconds <= 0 && !msg.ResponseRequired && !connection.AlwaysSyncSend &&
-               (!msg.Persistent || connection.AsyncSend || msg.TransactionId != null))
-            {
-                this.connection.Oneway(msg);
-
-                if(producerWindow != null)
-                {
-                    // Since we defer lots of the marshaling till we hit the wire, this
-                    // might not provide and accurate size. We may change over to doing
-                    // more aggressive marshaling, to get more accurate sizes.. this is more
-                    // important once users start using producer window flow control.
-                    producerWindow.IncreaseUsage(msg.Size());
-                }
-            }
-            else
-            {
-                if(sendTimeout.TotalMilliseconds > 0)
-                {
-                    this.connection.SyncRequest(msg, sendTimeout);
-                }
-                else
-                {
-                    this.connection.SyncRequest(msg);
-                }
-            }
-        }
-
-        /// <summary>
-        /// Ensures that a transaction is started
-        /// </summary>
-        public void DoStartTransaction()
-        {
-            if(Transacted)
-            {
-                this.TransactionContext.Begin();
-            }
-        }
-
-        public void AddConsumer(MessageConsumer consumer)
-        {
-            ConsumerId id = consumer.ConsumerId;
-
-            // Registered with Connection before we register at the broker.
-            consumers[id] = consumer;
-            connection.addDispatcher(id, this);
-        }
-
-        public void RemoveConsumer(ConsumerId objectId)
-        {
-            connection.removeDispatcher(objectId);
-            if(!this.closing)
-            {
-                consumers.Remove(objectId);
-            }
-        }
-
-        public void AddProducer(MessageProducer producer)
-        {
-            ProducerId id = producer.ProducerId;
-
-            this.producers[id] = producer;
-            this.connection.addProducer(id, producer);
-        }
-
-        public void RemoveProducer(ProducerId objectId)
-        {
-            connection.removeProducer(objectId);
-            if(!this.closing)
-            {
-                producers.Remove(objectId);
-            }
-        }
-
-        public ConsumerId GetNextConsumerId()
-        {
-            ConsumerId id = new ConsumerId();
-            id.ConnectionId = info.SessionId.ConnectionId;
-            id.SessionId = info.SessionId.Value;
-            id.Value = Interlocked.Increment(ref consumerCounter);
-
-            return id;
-        }
-
-        public ProducerId GetNextProducerId()
-        {
-            ProducerId id = new ProducerId();
-            id.ConnectionId = info.SessionId.ConnectionId;
-            id.SessionId = info.SessionId.Value;
-            id.Value = Interlocked.Increment(ref producerCounter);
-
-            return id;
-        }
-
-        public void Stop()
-        {
-            if(this.executor != null)
-            {
-                this.executor.Stop();
-            }
-        }
-
-        public void Start()
-        {
-            foreach(MessageConsumer consumer in this.consumers.Values)
-            {
-                consumer.Start();
-            }
-
-            if(this.executor != null)
-            {
-                this.executor.Start();
-            }
-        }
-
-        public bool Started
-        {
-            get
-            {
-                return this.executor != null ? this.executor.Running : false;
-            }
-        }
-
-        internal void Redispatch(MessageDispatchChannel channel)
-        {
-            MessageDispatch[] messages = channel.RemoveAll();
-            System.Array.Reverse(messages);
-
-            foreach(MessageDispatch message in messages)
-            {
-                this.executor.ExecuteFirst(message);
-            }
-        }
-
-        public void Dispatch(MessageDispatch dispatch)
-        {
-            if(this.executor != null)
-            {
-                this.executor.Execute(dispatch);
-            }
-        }
-
-        internal void ClearMessagesInProgress()
-        {
-            if( this.executor != null ) {
-                this.executor.ClearMessagesInProgress();
-            }
-
-            // Because we are called from inside the Transport Reconnection logic
-            // we spawn the Consumer clear to another Thread so that we can avoid
-            // any lock contention that might exist between the consumer and the
-            // connection that is reconnecting.
-            lock(this.consumers.SyncRoot)
-            {
-                foreach(MessageConsumer consumer in this.consumers.Values)
-                {
-                    consumer.InProgressClearRequired();
-                    ThreadPool.QueueUserWorkItem(ClearMessages, consumer);
-                }
-            }
-        }
-
-        private void ClearMessages(object value)
-        {
-            MessageConsumer consumer = value as MessageConsumer;
-
-            if(Tracer.IsDebugEnabled)
-            {
-                Tracer.Debug("Performing Async Clear of In Progress Messages on Consumer: " + consumer.ConsumerId);
-            }
-
-            consumer.ClearMessagesInProgress();
-        }
-
-        internal void Acknowledge()
-        {
-            lock(this.consumers.SyncRoot)
-            {
-                foreach(MessageConsumer consumer in this.consumers.Values)
-                {
-                    consumer.Acknowledge();
-                }
-            }
-        }
-
-        private ActiveMQMessage ConfigureMessage(ActiveMQMessage message)
-        {
-            message.Connection = this.connection;
-
-            if(this.IsTransacted)
-            {
-                // Allows Acknowledge to be called in a transaction with no effect per JMS Spec.
-                message.Acknowledger += new AcknowledgeHandler(DoNothingAcknowledge);
-            }
-
-            return message;
-        }
-
-        internal void SendAck(MessageAck ack)
-        {
-            this.SendAck(ack, false);
-        }
-
-        internal void SendAck(MessageAck ack, bool lazy)
-        {
-            if(lazy || connection.SendAcksAsync || this.IsTransacted )
-            {
-                this.connection.Oneway(ack);
-            }
-            else
-            {
-                this.connection.SyncRequest(ack);
-            }
-        }
-
-        /// <summary>
-        /// Prevents message from throwing an exception if a client calls Acknoweldge on
-        /// a message that is part of a transaction either being produced or consumed.  The
-        /// JMS Spec indicates that users should be able to call Acknowledge with no effect
-        /// if the message is in a transaction.
-        /// </summary>
-        /// <param name="message">
-        /// A <see cref="ActiveMQMessage"/>
-        /// </param>
-        private void DoNothingAcknowledge(ActiveMQMessage message)
-        {
-        }
+				if(this.Connection.IsStarted)
+				{
+					consumer.Start();
+				}
+			}
+			catch(Exception)
+			{
+				if(consumer != null)
+				{
+					this.RemoveConsumer(consumer.ConsumerId);
+					consumer.Close();
+				}
+
+				throw;
+			}
+
+			return consumer;
+		}
+
+		public void DeleteDurableConsumer(string name)
+		{
+			RemoveSubscriptionInfo command = new RemoveSubscriptionInfo();
+			command.ConnectionId = Connection.ConnectionId;
+			command.ClientId = Connection.ClientId;
+			command.SubcriptionName = name;
+			this.connection.SyncRequest(command);
+		}
+
+		public IQueueBrowser CreateBrowser(IQueue queue)
+		{
+			return this.CreateBrowser(queue, null);
+		}
+
+		public IQueueBrowser CreateBrowser(IQueue queue, string selector)
+		{
+			if(queue == null)
+			{
+				throw new InvalidDestinationException("Cannot create a Consumer with a Null destination");
+			}
+
+			ActiveMQDestination dest = ActiveMQDestination.Transform(queue);
+			QueueBrowser browser = null;
+
+			try
+			{
+				browser = new QueueBrowser(this, GetNextConsumerId(), dest, selector, this.DispatchAsync);
+			}
+			catch(Exception)
+			{
+				if(browser != null)
+				{
+					browser.Close();
+				}
+
+				throw;
+			}
+
+			return browser;
+		}
+
+		public IQueue GetQueue(string name)
+		{
+			return new ActiveMQQueue(name);
+		}
+
+		public ITopic GetTopic(string name)
+		{
+			return new ActiveMQTopic(name);
+		}
+
+		public ITemporaryQueue CreateTemporaryQueue()
+		{
+			return (ITemporaryQueue)this.connection.CreateTemporaryDestination(false);
+		}
+
+		public ITemporaryTopic CreateTemporaryTopic()
+		{
+			return (ITemporaryTopic)this.connection.CreateTemporaryDestination(true);
+		}
+
+		/// <summary>
+		/// Delete a destination (Queue, Topic, Temp Queue, Temp Topic).
+		/// </summary>
+		public void DeleteDestination(IDestination destination)
+		{
+			this.connection.DeleteDestination(destination);
+		}
+
+		public IMessage CreateMessage()
+		{
+			ActiveMQMessage answer = new ActiveMQMessage();
+			return ConfigureMessage(answer) as IMessage;
+		}
+
+		public ITextMessage CreateTextMessage()
+		{
+			ActiveMQTextMessage answer = new ActiveMQTextMessage();
+			return ConfigureMessage(answer) as ITextMessage;
+		}
+
+		public ITextMessage CreateTextMessage(string text)
+		{
+			ActiveMQTextMessage answer = new ActiveMQTextMessage(text);
+			return ConfigureMessage(answer) as ITextMessage;
+		}
+
+		public IMapMessage CreateMapMessage()
+		{
+			return ConfigureMessage(new ActiveMQMapMessage()) as IMapMessage;
+		}
+
+		public IBytesMessage CreateBytesMessage()
+		{
+			return ConfigureMessage(new ActiveMQBytesMessage()) as IBytesMessage;
+		}
+
+		public IBytesMessage CreateBytesMessage(byte[] body)
+		{
+			ActiveMQBytesMessage answer = new ActiveMQBytesMessage();
+			answer.Content = body;
+			return ConfigureMessage(answer) as IBytesMessage;
+		}
+
+		public IStreamMessage CreateStreamMessage()
+		{
+			return ConfigureMessage(new ActiveMQStreamMessage()) as IStreamMessage;
+		}
+
+		public IObjectMessage CreateObjectMessage(object body)
+		{
+			ActiveMQObjectMessage answer = new ActiveMQObjectMessage();
+			answer.Body = body;
+			return ConfigureMessage(answer) as IObjectMessage;
+		}
+
+		public void Commit()
+		{
+			if(!Transacted)
+			{
+				throw new InvalidOperationException(
+						"You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: "
+						+ this.AcknowledgementMode);
+			}
+
+			this.TransactionContext.Commit();
+		}
+
+		public void Rollback()
+		{
+			if(!Transacted)
+			{
+				throw new InvalidOperationException(
+						"You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: "
+						+ this.AcknowledgementMode);
+			}
+
+			this.TransactionContext.Rollback();
+		}
+
+		#endregion
+
+		public void DoSend( ActiveMQMessage message, MessageProducer producer, MemoryUsage producerWindow, TimeSpan sendTimeout )
+		{
+			ActiveMQMessage msg = message;
+
+			if(Transacted)
+			{
+				DoStartTransaction();
+				msg.TransactionId = TransactionContext.TransactionId;
+			}
+
+			msg.RedeliveryCounter = 0;
+			msg.BrokerPath = null;
+
+			if(this.connection.CopyMessageOnSend)
+			{
+				msg = (ActiveMQMessage)msg.Clone();
+			}
+
+			msg.OnSend();
+			msg.ProducerId = msg.MessageId.ProducerId;
+
+			if(sendTimeout.TotalMilliseconds <= 0 && !msg.ResponseRequired && !connection.AlwaysSyncSend &&
+			   (!msg.Persistent || connection.AsyncSend || msg.TransactionId != null))
+			{
+				this.connection.Oneway(msg);
+
+				if(producerWindow != null)
+				{
+					// Since we defer lots of the marshaling till we hit the wire, this
+					// might not provide and accurate size. We may change over to doing
+					// more aggressive marshaling, to get more accurate sizes.. this is more
+					// important once users start using producer window flow control.
+					producerWindow.IncreaseUsage(msg.Size());
+				}
+			}
+			else
+			{
+				if(sendTimeout.TotalMilliseconds > 0)
+				{
+					this.connection.SyncRequest(msg, sendTimeout);
+				}
+				else
+				{
+					this.connection.SyncRequest(msg);
+				}
+			}
+		}
+
+		/// <summary>
+		/// Ensures that a transaction is started
+		/// </summary>
+		public void DoStartTransaction()
+		{
+			if(Transacted)
+			{
+				this.TransactionContext.Begin();
+			}
+		}
+
+		public void AddConsumer(MessageConsumer consumer)
+		{
+			ConsumerId id = consumer.ConsumerId;
+
+			// Registered with Connection before we register at the broker.
+			consumers[id] = consumer;
+			connection.addDispatcher(id, this);
+		}
+
+		public void RemoveConsumer(ConsumerId objectId)
+		{
+			connection.removeDispatcher(objectId);
+			if(!this.closing)
+			{
+				consumers.Remove(objectId);
+			}
+		}
+
+		public void AddProducer(MessageProducer producer)
+		{
+			ProducerId id = producer.ProducerId;
+
+			this.producers[id] = producer;
+			this.connection.addProducer(id, producer);
+		}
+
+		public void RemoveProducer(ProducerId objectId)
+		{
+			connection.removeProducer(objectId);
+			if(!this.closing)
+			{
+				producers.Remove(objectId);
+			}
+		}
+
+		public ConsumerId GetNextConsumerId()
+		{
+			ConsumerId id = new ConsumerId();
+			id.ConnectionId = info.SessionId.ConnectionId;
+			id.SessionId = info.SessionId.Value;
+			id.Value = Interlocked.Increment(ref consumerCounter);
+
+			return id;
+		}
+
+		public ProducerId GetNextProducerId()
+		{
+			ProducerId id = new ProducerId();
+			id.ConnectionId = info.SessionId.ConnectionId;
+			id.SessionId = info.SessionId.Value;
+			id.Value = Interlocked.Increment(ref producerCounter);
+
+			return id;
+		}
+
+		public void Stop()
+		{
+			if(this.executor != null)
+			{
+				this.executor.Stop();
+			}
+		}
+
+		public void Start()
+		{
+			foreach(MessageConsumer consumer in this.consumers.Values)
+			{
+				consumer.Start();
+			}
+
+			if(this.executor != null)
+			{
+				this.executor.Start();
+			}
+		}
+
+		public bool Started
+		{
+			get
+			{
+				return this.executor != null ? this.executor.Running : false;
+			}
+		}
+
+		internal void Redispatch(MessageDispatchChannel channel)
+		{
+			MessageDispatch[] messages = channel.RemoveAll();
+			System.Array.Reverse(messages);
+
+			foreach(MessageDispatch message in messages)
+			{
+				this.executor.ExecuteFirst(message);
+			}
+		}
+
+		public void Dispatch(MessageDispatch dispatch)
+		{
+			if(this.executor != null)
+			{
+				this.executor.Execute(dispatch);
+			}
+		}
+
+		internal void ClearMessagesInProgress()
+		{
+			if( this.executor != null ) {
+				this.executor.ClearMessagesInProgress();
+			}
+
+			// Because we are called from inside the Transport Reconnection logic
+			// we spawn the Consumer clear to another Thread so that we can avoid
+			// any lock contention that might exist between the consumer and the
+			// connection that is reconnecting.
+			lock(this.consumers.SyncRoot)
+			{
+				foreach(MessageConsumer consumer in this.consumers.Values)
+				{
+					consumer.InProgressClearRequired();
+					ThreadPool.QueueUserWorkItem(ClearMessages, consumer);
+				}
+			}
+		}
+
+		private void ClearMessages(object value)
+		{
+			MessageConsumer consumer = value as MessageConsumer;
+
+			if(Tracer.IsDebugEnabled)
+			{
+				Tracer.Debug("Performing Async Clear of In Progress Messages on Consumer: " + consumer.ConsumerId);
+			}
+
+			consumer.ClearMessagesInProgress();
+		}
+
+		internal void Acknowledge()
+		{
+			lock(this.consumers.SyncRoot)
+			{
+				foreach(MessageConsumer consumer in this.consumers.Values)
+				{
+					consumer.Acknowledge();
+				}
+			}
+		}
+
+		private ActiveMQMessage ConfigureMessage(ActiveMQMessage message)
+		{
+			message.Connection = this.connection;
+
+			if(this.IsTransacted)
+			{
+				// Allows Acknowledge to be called in a transaction with no effect per JMS Spec.
+				message.Acknowledger += new AcknowledgeHandler(DoNothingAcknowledge);
+			}
+
+			return message;
+		}
+
+		internal void SendAck(MessageAck ack)
+		{
+			this.SendAck(ack, false);
+		}
+
+		internal void SendAck(MessageAck ack, bool lazy)
+		{
+			if(lazy || connection.SendAcksAsync || this.IsTransacted )
+			{
+				this.connection.Oneway(ack);
+			}
+			else
+			{
+				this.connection.SyncRequest(ack);
+			}
+		}
+
+		/// <summary>
+		/// Prevents message from throwing an exception if a client calls Acknoweldge on
+		/// a message that is part of a transaction either being produced or consumed.  The
+		/// JMS Spec indicates that users should be able to call Acknowledge with no effect
+		/// if the message is in a transaction.
+		/// </summary>
+		/// <param name="message">
+		/// A <see cref="ActiveMQMessage"/>
+		/// </param>
+		private void DoNothingAcknowledge(ActiveMQMessage message)
+		{
+		}
 
-    }
+	}
 }

Modified: activemq/activemq-dotnet/Apache.NMS.EMS/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.EMS/trunk/src/main/csharp/Connection.cs?rev=990885&r1=990884&r2=990885&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.EMS/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.EMS/trunk/src/main/csharp/Connection.cs Mon Aug 30 18:04:21 2010
@@ -307,5 +307,33 @@ namespace Apache.NMS.EMS
 				}
 			}
 		}
+
+		private ConsumerTransformerDelegate consumerTransformer;
+		/// <summary>
+		/// A Delegate that is called each time a Message is dispatched to allow the client to do
+		/// any necessary transformations on the received message before it is delivered.  The
+		/// ConnectionFactory sets the provided delegate instance on each Connection instance that
+		/// is created from this factory, each connection in turn passes the delegate along to each
+		/// Session it creates which then passes that along to the Consumers it creates.
+		/// </summary>
+		public ConsumerTransformerDelegate ConsumerTransformer
+		{
+			get { return this.consumerTransformer; }
+			set { this.consumerTransformer = value; }
+		}
+
+		private ProducerTransformerDelegate producerTransformer;
+		/// <summary>
+		/// A delegate that is called each time a Message is sent from this Producer which allows
+		/// the application to perform any needed transformations on the Message before it is sent.
+		/// The ConnectionFactory sets the provided delegate instance on each Connection instance that
+		/// is created from this factory, each connection in turn passes the delegate along to each
+		/// Session it creates which then passes that along to the Producers it creates.
+		/// </summary>
+		public ProducerTransformerDelegate ProducerTransformer
+		{
+			get { return this.producerTransformer; }
+			set { this.producerTransformer = value; }
+		}
 	}
 }

Modified: activemq/activemq-dotnet/Apache.NMS.EMS/trunk/src/main/csharp/ConnectionFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.EMS/trunk/src/main/csharp/ConnectionFactory.cs?rev=990885&r1=990884&r2=990885&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.EMS/trunk/src/main/csharp/ConnectionFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.EMS/trunk/src/main/csharp/ConnectionFactory.cs Mon Aug 30 18:04:21 2010
@@ -111,7 +111,7 @@ namespace Apache.NMS.EMS
 			try
 			{
 				connection = EMSConvert.ToNMSConnection(this.tibcoConnectionFactory.CreateConnection());
-				connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as IRedeliveryPolicy;
+				ConfigureConnection(connection);
 			}
 			catch(Exception ex)
 			{
@@ -131,7 +131,7 @@ namespace Apache.NMS.EMS
 			try
 			{
 				connection = EMSConvert.ToNMSConnection(this.tibcoConnectionFactory.CreateConnection(userName, password));
-				connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as IRedeliveryPolicy;
+				ConfigureConnection(connection);
 			}
 			catch(Exception ex)
 			{
@@ -142,6 +142,17 @@ namespace Apache.NMS.EMS
 		}
 
 		/// <summary>
+		/// Configure the newly created connection.
+		/// </summary>
+		/// <param name="connection"></param>
+		private void ConfigureConnection(IConnection connection)
+		{
+			connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as IRedeliveryPolicy;
+			connection.ConsumerTransformer = this.consumerTransformer;
+			connection.ProducerTransformer = this.producerTransformer;
+		}
+
+		/// <summary>
 		/// Get/or set the broker Uri.
 		/// </summary>
 		public Uri BrokerUri
@@ -202,6 +213,34 @@ namespace Apache.NMS.EMS
 			}
 		}
 
+		private ConsumerTransformerDelegate consumerTransformer;
+		/// <summary>
+		/// A Delegate that is called each time a Message is dispatched to allow the client to do
+		/// any necessary transformations on the received message before it is delivered.  The
+		/// ConnectionFactory sets the provided delegate instance on each Connection instance that
+		/// is created from this factory, each connection in turn passes the delegate along to each
+		/// Session it creates which then passes that along to the Consumers it creates.
+		/// </summary>
+		public ConsumerTransformerDelegate ConsumerTransformer
+		{
+			get { return this.consumerTransformer; }
+			set { this.consumerTransformer = value; }
+		}
+
+		private ProducerTransformerDelegate producerTransformer;
+		/// <summary>
+		/// A delegate that is called each time a Message is sent from this Producer which allows
+		/// the application to perform any needed transformations on the Message before it is sent.
+		/// The ConnectionFactory sets the provided delegate instance on each Connection instance that
+		/// is created from this factory, each connection in turn passes the delegate along to each
+		/// Session it creates which then passes that along to the Producers it creates.
+		/// </summary>
+		public ProducerTransformerDelegate ProducerTransformer
+		{
+			get { return this.producerTransformer; }
+			set { this.producerTransformer = value; }
+		}
+
 		#endregion
 	}
 }

Modified: activemq/activemq-dotnet/Apache.NMS.EMS/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.EMS/trunk/src/main/csharp/MessageConsumer.cs?rev=990885&r1=990884&r2=990885&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.EMS/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.EMS/trunk/src/main/csharp/MessageConsumer.cs Mon Aug 30 18:04:21 2010
@@ -41,6 +41,17 @@ namespace Apache.NMS.EMS
 
 		#region IMessageConsumer Members
 
+		private ConsumerTransformerDelegate consumerTransformer;
+		/// <summary>
+		/// A Delegate that is called each time a Message is dispatched to allow the client to do
+		/// any necessary transformations on the received message before it is delivered.
+		/// </summary>
+		public ConsumerTransformerDelegate ConsumerTransformer
+		{
+			get { return this.consumerTransformer; }
+			set { this.consumerTransformer = value; }
+		}
+
 		/// <summary>
 		/// Waits until a message is available and returns it
 		/// </summary>
@@ -158,6 +169,16 @@ namespace Apache.NMS.EMS
 
 			if(null != message)
 			{
+				if(this.ConsumerTransformer != null)
+				{
+					IMessage newMessage = ConsumerTransformer(this.nmsSession, this, message);
+
+					if(newMessage != null)
+					{
+						message = newMessage;
+					}
+				}
+
 				if(Listener != null)
 				{
 					try

Modified: activemq/activemq-dotnet/Apache.NMS.EMS/trunk/src/main/csharp/MessageProducer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.EMS/trunk/src/main/csharp/MessageProducer.cs?rev=990885&r1=990884&r2=990885&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.EMS/trunk/src/main/csharp/MessageProducer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.EMS/trunk/src/main/csharp/MessageProducer.cs Mon Aug 30 18:04:21 2010
@@ -40,6 +40,22 @@ namespace Apache.NMS.EMS
 			Dispose(false);
 		}
 
+		private Apache.NMS.EMS.Message GetEMSMessage(Apache.NMS.IMessage message)
+		{
+			Apache.NMS.EMS.Message msg = (Apache.NMS.EMS.Message) message;
+
+			if(this.ProducerTransformer != null)
+			{
+				IMessage transformed = this.ProducerTransformer(this.nmsSession, this, message);
+				if(transformed != null)
+				{
+					msg = (Apache.NMS.EMS.Message) transformed;
+				}
+			}
+
+			return msg;
+		}
+
 		#region IMessageProducer Members
 
 		/// <summary>
@@ -47,7 +63,7 @@ namespace Apache.NMS.EMS
 		/// </summary>
 		public void Send(Apache.NMS.IMessage message)
 		{
-			Apache.NMS.EMS.Message msg = (Apache.NMS.EMS.Message) message;
+			Apache.NMS.EMS.Message msg = GetEMSMessage(message);
 			long timeToLive = (long) message.NMSTimeToLive.TotalMilliseconds;
 
 			if(0 == timeToLive)
@@ -74,7 +90,7 @@ namespace Apache.NMS.EMS
 		/// </summary>
 		public void Send(Apache.NMS.IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
 		{
-			Apache.NMS.EMS.Message msg = (Apache.NMS.EMS.Message) message;
+			Apache.NMS.EMS.Message msg = GetEMSMessage(message);
 
 			try
 			{
@@ -96,7 +112,7 @@ namespace Apache.NMS.EMS
 		public void Send(Apache.NMS.IDestination destination, Apache.NMS.IMessage message)
 		{
 			Apache.NMS.EMS.Destination dest = (Apache.NMS.EMS.Destination) destination;
-			Apache.NMS.EMS.Message msg = (Apache.NMS.EMS.Message) message;
+			Apache.NMS.EMS.Message msg = GetEMSMessage(message);
 			long timeToLive = (long) message.NMSTimeToLive.TotalMilliseconds;
 
 			if(0 == timeToLive)
@@ -126,7 +142,7 @@ namespace Apache.NMS.EMS
 						MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
 		{
 			Apache.NMS.EMS.Destination dest = (Apache.NMS.EMS.Destination) destination;
-			Apache.NMS.EMS.Message msg = (Apache.NMS.EMS.Message) message;
+			Apache.NMS.EMS.Message msg = GetEMSMessage(message);
 
 			try
 			{
@@ -143,6 +159,17 @@ namespace Apache.NMS.EMS
 			}
 		}
 
+		private ProducerTransformerDelegate producerTransformer;
+		/// <summary>
+		/// A delegate that is called each time a Message is sent from this Producer which allows
+		/// the application to perform any needed transformations on the Message before it is sent.
+		/// </summary>
+		public ProducerTransformerDelegate ProducerTransformer
+		{
+			get { return this.producerTransformer; }
+			set { this.producerTransformer = value; }
+		}
+
 		public MsgDeliveryMode DeliveryMode
 		{
 			get { return EMSConvert.ToNMSMsgDeliveryMode(this.tibcoMessageProducer.MsgDeliveryMode); }

Modified: activemq/activemq-dotnet/Apache.NMS.EMS/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.EMS/trunk/src/main/csharp/Session.cs?rev=990885&r1=990884&r2=990885&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.EMS/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.EMS/trunk/src/main/csharp/Session.cs Mon Aug 30 18:04:21 2010
@@ -50,7 +50,9 @@ namespace Apache.NMS.EMS
 
 			try
 			{
-				return EMSConvert.ToNMSMessageProducer(this, this.tibcoSession.CreateProducer(destinationObj.tibcoDestination));
+				Apache.NMS.IMessageProducer producer = EMSConvert.ToNMSMessageProducer(this, this.tibcoSession.CreateProducer(destinationObj.tibcoDestination));
+				ConfigureProducer(producer);
+				return producer;
 			}
 			catch(Exception ex)
 			{
@@ -65,7 +67,9 @@ namespace Apache.NMS.EMS
 
 			try
 			{
-				return EMSConvert.ToNMSMessageConsumer(this, this.tibcoSession.CreateConsumer(destinationObj.tibcoDestination));
+				Apache.NMS.IMessageConsumer consumer = EMSConvert.ToNMSMessageConsumer(this, this.tibcoSession.CreateConsumer(destinationObj.tibcoDestination));
+				ConfigureConsumer(consumer);
+				return consumer;
 			}
 			catch(Exception ex)
 			{
@@ -80,7 +84,9 @@ namespace Apache.NMS.EMS
 
 			try
 			{
-				return EMSConvert.ToNMSMessageConsumer(this, this.tibcoSession.CreateConsumer(destinationObj.tibcoDestination, selector));
+				Apache.NMS.IMessageConsumer consumer = EMSConvert.ToNMSMessageConsumer(this, this.tibcoSession.CreateConsumer(destinationObj.tibcoDestination, selector));
+				ConfigureConsumer(consumer);
+				return consumer;
 			}
 			catch(Exception ex)
 			{
@@ -95,7 +101,9 @@ namespace Apache.NMS.EMS
 
 			try
 			{
-				return EMSConvert.ToNMSMessageConsumer(this, this.tibcoSession.CreateConsumer(destinationObj.tibcoDestination, selector, noLocal));
+				Apache.NMS.IMessageConsumer consumer = EMSConvert.ToNMSMessageConsumer(this, this.tibcoSession.CreateConsumer(destinationObj.tibcoDestination, selector, noLocal));
+				ConfigureConsumer(consumer);
+				return consumer;
 			}
 			catch(Exception ex)
 			{
@@ -110,7 +118,9 @@ namespace Apache.NMS.EMS
 
 			try
 			{
-				return EMSConvert.ToNMSMessageConsumer(this, this.tibcoSession.CreateDurableSubscriber(topicObj.tibcoTopic, name, selector, noLocal));
+				Apache.NMS.IMessageConsumer consumer = EMSConvert.ToNMSMessageConsumer(this, this.tibcoSession.CreateDurableSubscriber(topicObj.tibcoTopic, name, selector, noLocal));
+				ConfigureConsumer(consumer);
+				return consumer;
 			}
 			catch(Exception ex)
 			{
@@ -119,6 +129,16 @@ namespace Apache.NMS.EMS
 			}
 		}
 
+		private void ConfigureProducer(Apache.NMS.IMessageProducer producer)
+		{
+			producer.ProducerTransformer = this.ProducerTransformer;
+		}
+
+		private void ConfigureConsumer(Apache.NMS.IMessageConsumer consumer)
+		{
+			consumer.ConsumerTransformer = this.ConsumerTransformer;
+		}
+
 		public void DeleteDurableConsumer(string name)
 		{
 			try
@@ -331,7 +351,7 @@ namespace Apache.NMS.EMS
 				return null;
 			}
 		}
-		
+
 		public void Commit()
 		{
 			try
@@ -343,7 +363,7 @@ namespace Apache.NMS.EMS
 				ExceptionUtil.WrapAndThrowNMSException(ex);
 			}
 		}
-		
+
 		public void Rollback()
 		{
 			try
@@ -355,7 +375,31 @@ namespace Apache.NMS.EMS
 				ExceptionUtil.WrapAndThrowNMSException(ex);
 			}
 		}
-		
+
+		private ConsumerTransformerDelegate consumerTransformer;
+		/// <summary>
+		/// A Delegate that is called each time a Message is dispatched to allow the client to do
+		/// any necessary transformations on the received message before it is delivered.
+		/// The Session instance sets the delegate on each Consumer it creates.
+		/// </summary>
+		public ConsumerTransformerDelegate ConsumerTransformer
+		{
+			get { return this.consumerTransformer; }
+			set { this.consumerTransformer = value; }
+		}
+
+		private ProducerTransformerDelegate producerTransformer;
+		/// <summary>
+		/// A delegate that is called each time a Message is sent from this Producer which allows
+		/// the application to perform any needed transformations on the Message before it is sent.
+		/// The Session instance sets the delegate on each Producer it creates.
+		/// </summary>
+		public ProducerTransformerDelegate ProducerTransformer
+		{
+			get { return this.producerTransformer; }
+			set { this.producerTransformer = value; }
+		}
+
 		// Properties
 
 		/// <summary>
@@ -367,7 +411,7 @@ namespace Apache.NMS.EMS
 			get { return this.requestTimeout; }
 			set { this.requestTimeout = value; }
 		}
-		
+
 		public bool Transacted
 		{
 			get { return this.tibcoSession.Transacted; }
@@ -405,7 +449,7 @@ namespace Apache.NMS.EMS
 		#endregion
 
 		#region IDisposable Members
-		
+
 		///<summary>
 		/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
 		///</summary>

Modified: activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/IConnection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/IConnection.cs?rev=990885&r1=990884&r2=990885&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/IConnection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/IConnection.cs Mon Aug 30 18:04:21 2010
@@ -18,161 +18,153 @@ using System;
 
 namespace Apache.NMS
 {
-    /// <summary>
-    /// The mode used to acknowledge messages after they are consumed
-    /// </summary>
-    public enum AcknowledgementMode
-    {
-        /// <summary>
-        /// With this acknowledgment mode, the session will not
-        /// acknowledge receipt of a message since the broker assumes
-        /// successful receipt of a message after the onMessage handler
-        /// has returned without error.
-        /// </summary>
-        AutoAcknowledge,
-
-        /// <summary>
-        /// With this acknowledgment mode, the session automatically
-        /// acknowledges a client's receipt of a message either when
-        /// the session has successfully returned from a call to receive
-        /// or when the message listener the session has called to
-        /// process the message successfully returns.  Acknowlegements
-        /// may be delayed in this mode to increase performance at
-        /// the cost of the message being redelivered this client fails.
-        /// </summary>
-        DupsOkAcknowledge,
-
-        /// <summary>
-        /// With this acknowledgment mode, the client acknowledges a
-        /// consumed message by calling the message's acknowledge method.
-        /// This acknowledgement acknowledges the given message and all
-        /// unacknowedged messages that have preceeded it for the session
-        /// in which the message was delivered.
-        /// </summary>
-        ClientAcknowledge,
-
-        /// <summary>
-        /// Messages will be consumed when the transaction commits.
-        /// </summary>
-        Transactional,
-
-        /// <summary>
-        /// With this acknowledgment mode, the client acknowledges a
-        /// consumed message by calling the message's acknowledge method.
-        /// This acknowledgement mode allows the client to acknowledge a
-        /// single message.  This mode is not required to be supported by
-        /// all NMS providers, however the provider should throw an appropriate
-        /// exception to indicate that the mode is unsupported.
-        /// </summary>
-        IndividualAcknowledge
-    }
-
-    /// <summary>
-    /// A delegate that can receive transport level exceptions.
-    /// </summary>
-    public delegate void ExceptionListener(Exception exception);
-
-    /// <summary>
-    /// A delegate that is used by Fault tolerant NMS Implementation to notify their
-    /// clients that the Connection is not currently active to due some error.
-    /// </summary>
-    public delegate void ConnectionInterruptedListener();
-
-    /// <summary>
-    /// A delegate that is used by Fault tolerant NMS Implementation to notify their
-    /// clients that the Connection that was interrupted has now been restored.
-    /// </summary>
-    public delegate void ConnectionResumedListener();
-	
-    /// <summary>
-    /// Represents a connection with a message broker
-    /// </summary>
-    public interface IConnection : IDisposable, IStartable, IStoppable
-    {
-        /// <summary>
-        /// Creates a new session to work on this connection
-        /// </summary>
-        ISession CreateSession();
-
-        /// <summary>
-        /// Creates a new session to work on this connection
-        /// </summary>
-        ISession CreateSession(AcknowledgementMode acknowledgementMode);
-
-        /// <summary>
-        /// Closes the connection.
-        /// </summary>
-        void Close();
-
-        /// <summary>
-        /// An asynchronous listener which can be notified if an error occurs
-        /// </summary>
-        event ExceptionListener ExceptionListener;
-
-        /// <summary>
-        /// An asynchronous listener that is notified when a Fault tolerant connection
-        /// has been interrupted.
-        /// </summary>
-        event ConnectionInterruptedListener ConnectionInterruptedListener;
-
-        /// <summary>
-        /// An asynchronous listener that is notified when a Fault tolerant connection
-        /// has been resumed.
-        /// </summary>
-        event ConnectionResumedListener ConnectionResumedListener;
-		
-        /// <summary>
-        /// A Delegate that is called each time a Message is dispatched to allow the client to do
-        /// any necessary transformations on the received message before it is delivered.  The
-        /// Connection sets the provided delegate instance on each Session it creates which then
-        /// passes that along to the Consumers it creates.
-        /// </summary>
-        ConsumerTransformerDelegate ConsumerTransformer
-        {
-            get;
-            set;
-        }
-
-        /// <summary>
-        /// A delegate that is called each time a Message is sent from this Producer which allows
-        /// the application to perform any needed transformations on the Message before it is sent.
-        /// The Connection sets the provided delegate instance on each Session it creates which then
-        /// passes that along to the Producer it creates.
-        /// </summary>
-        ProducerTransformerDelegate ProducerTransformer
-        {
-            get;
-            set;
-        }
-
-        #region Attributes
-
-        /// <summary>
-        /// The default timeout for network requests.
-        /// </summary>
-        TimeSpan RequestTimeout { get; set; }
-
-        /// <summary>
-        /// The default acknowledgement mode
-        /// </summary>
-        AcknowledgementMode AcknowledgementMode { get; set; }
-
-        /// <summary>
-        /// Sets the unique clienet ID for this connection before Start() or returns the
-        /// unique client ID after the connection has started
-        /// </summary>
-        string ClientId { get; set; }
+	/// <summary>
+	/// The mode used to acknowledge messages after they are consumed
+	/// </summary>
+	public enum AcknowledgementMode
+	{
+		/// <summary>
+		/// With this acknowledgment mode, the session will not
+		/// acknowledge receipt of a message since the broker assumes
+		/// successful receipt of a message after the onMessage handler
+		/// has returned without error.
+		/// </summary>
+		AutoAcknowledge,
+
+		/// <summary>
+		/// With this acknowledgment mode, the session automatically
+		/// acknowledges a client's receipt of a message either when
+		/// the session has successfully returned from a call to receive
+		/// or when the message listener the session has called to
+		/// process the message successfully returns.  Acknowlegements
+		/// may be delayed in this mode to increase performance at
+		/// the cost of the message being redelivered this client fails.
+		/// </summary>
+		DupsOkAcknowledge,
+
+		/// <summary>
+		/// With this acknowledgment mode, the client acknowledges a
+		/// consumed message by calling the message's acknowledge method.
+		/// This acknowledgement acknowledges the given message and all
+		/// unacknowedged messages that have preceeded it for the session
+		/// in which the message was delivered.
+		/// </summary>
+		ClientAcknowledge,
+
+		/// <summary>
+		/// Messages will be consumed when the transaction commits.
+		/// </summary>
+		Transactional,
+
+		/// <summary>
+		/// With this acknowledgment mode, the client acknowledges a
+		/// consumed message by calling the message's acknowledge method.
+		/// This acknowledgement mode allows the client to acknowledge a
+		/// single message.  This mode is not required to be supported by
+		/// all NMS providers, however the provider should throw an appropriate
+		/// exception to indicate that the mode is unsupported.
+		/// </summary>
+		IndividualAcknowledge
+	}
+
+	/// <summary>
+	/// A delegate that can receive transport level exceptions.
+	/// </summary>
+	public delegate void ExceptionListener(Exception exception);
+
+	/// <summary>
+	/// A delegate that is used by Fault tolerant NMS Implementation to notify their
+	/// clients that the Connection is not currently active to due some error.
+	/// </summary>
+	public delegate void ConnectionInterruptedListener();
+
+	/// <summary>
+	/// A delegate that is used by Fault tolerant NMS Implementation to notify their
+	/// clients that the Connection that was interrupted has now been restored.
+	/// </summary>
+	public delegate void ConnectionResumedListener();
+
+	/// <summary>
+	/// Represents a connection with a message broker
+	/// </summary>
+	public interface IConnection : IDisposable, IStartable, IStoppable
+	{
+		/// <summary>
+		/// Creates a new session to work on this connection
+		/// </summary>
+		ISession CreateSession();
+
+		/// <summary>
+		/// Creates a new session to work on this connection
+		/// </summary>
+		ISession CreateSession(AcknowledgementMode acknowledgementMode);
+
+		/// <summary>
+		/// Closes the connection.
+		/// </summary>
+		void Close();
+
+		/// <summary>
+		/// An asynchronous listener which can be notified if an error occurs
+		/// </summary>
+		event ExceptionListener ExceptionListener;
+
+		/// <summary>
+		/// An asynchronous listener that is notified when a Fault tolerant connection
+		/// has been interrupted.
+		/// </summary>
+		event ConnectionInterruptedListener ConnectionInterruptedListener;
+
+		/// <summary>
+		/// An asynchronous listener that is notified when a Fault tolerant connection
+		/// has been resumed.
+		/// </summary>
+		event ConnectionResumedListener ConnectionResumedListener;
+
+		/// <summary>
+		/// A Delegate that is called each time a Message is dispatched to allow the client to do
+		/// any necessary transformations on the received message before it is delivered.  The
+		/// Connection sets the provided delegate instance on each Session it creates which then
+		/// passes that along to the Consumers it creates.
+		/// </summary>
+		ConsumerTransformerDelegate ConsumerTransformer { get; set; }
+
+		/// <summary>
+		/// A delegate that is called each time a Message is sent from this Producer which allows
+		/// the application to perform any needed transformations on the Message before it is sent.
+		/// The Connection sets the provided delegate instance on each Session it creates which then
+		/// passes that along to the Producer it creates.
+		/// </summary>
+		ProducerTransformerDelegate ProducerTransformer { get; set; }
+
+		#region Attributes
+
+		/// <summary>
+		/// The default timeout for network requests.
+		/// </summary>
+		TimeSpan RequestTimeout { get; set; }
+
+		/// <summary>
+		/// The default acknowledgement mode
+		/// </summary>
+		AcknowledgementMode AcknowledgementMode { get; set; }
+
+		/// <summary>
+		/// Sets the unique clienet ID for this connection before Start() or returns the
+		/// unique client ID after the connection has started
+		/// </summary>
+		string ClientId { get; set; }
 
 		/// <summary>
 		/// Get/or set the redelivery policy for this connection.
 		/// </summary>
 		IRedeliveryPolicy RedeliveryPolicy { get; set; }
-		
+
 		/// <summary>
-        /// Gets the Meta Data for the NMS Connection instance.
-        /// </summary>
-        IConnectionMetaData MetaData{ get; }
+		/// Gets the Meta Data for the NMS Connection instance.
+		/// </summary>
+		IConnectionMetaData MetaData{ get; }
 
-        #endregion
-    }
+		#endregion
+	}
 }

Modified: activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/IConnectionFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/IConnectionFactory.cs?rev=990885&r1=990884&r2=990885&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/IConnectionFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/IConnectionFactory.cs Mon Aug 30 18:04:21 2010
@@ -28,7 +28,7 @@ namespace Apache.NMS
 		/// Creates a new connection
 		/// </summary>
 		IConnection CreateConnection();
-		
+
 		/// <summary>
 		/// Creates a new connection with the given user name and password
 		/// </summary>
@@ -39,36 +39,28 @@ namespace Apache.NMS
 		/// </summary>
 		Uri BrokerUri { get; set; }
 
-        /// <summary>
-        /// Get/or set the redelivery policy that new IConnection objects are
-        /// assigned upon creation.
-        /// </summary>
-        IRedeliveryPolicy RedeliveryPolicy{ get; set; }
+		/// <summary>
+		/// Get/or set the redelivery policy that new IConnection objects are
+		/// assigned upon creation.
+		/// </summary>
+		IRedeliveryPolicy RedeliveryPolicy { get; set; }
 
-        /// <summary>
-        /// A Delegate that is called each time a Message is dispatched to allow the client to do
-        /// any necessary transformations on the received message before it is delivered.  The
-        /// ConnectionFactory sets the provided delegate instance on each Connection instance that
-        /// is created from this factory, each connection in turn passes the delegate along to each
-        /// Session it creates which then passes that along to the Consumers it creates.
-        /// </summary>
-        ConsumerTransformerDelegate ConsumerTransformer
-        {
-            get;
-            set;
-        }
+		/// <summary>
+		/// A Delegate that is called each time a Message is dispatched to allow the client to do
+		/// any necessary transformations on the received message before it is delivered.  The
+		/// ConnectionFactory sets the provided delegate instance on each Connection instance that
+		/// is created from this factory, each connection in turn passes the delegate along to each
+		/// Session it creates which then passes that along to the Consumers it creates.
+		/// </summary>
+		ConsumerTransformerDelegate ConsumerTransformer { get; set; }
 
-        /// <summary>
-        /// A delegate that is called each time a Message is sent from this Producer which allows
-        /// the application to perform any needed transformations on the Message before it is sent.
-        /// The ConnectionFactory sets the provided delegate instance on each Connection instance that
-        /// is created from this factory, each connection in turn passes the delegate along to each
-        /// Session it creates which then passes that along to the Producers it creates.
-        /// </summary>
-        ProducerTransformerDelegate ProducerTransformer
-        {
-            get;
-            set;
-        }
+		/// <summary>
+		/// A delegate that is called each time a Message is sent from this Producer which allows
+		/// the application to perform any needed transformations on the Message before it is sent.
+		/// The ConnectionFactory sets the provided delegate instance on each Connection instance that
+		/// is created from this factory, each connection in turn passes the delegate along to each
+		/// Session it creates which then passes that along to the Producers it creates.
+		/// </summary>
+		ProducerTransformerDelegate ProducerTransformer { get; set; }
 	}
 }

Modified: activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/IMessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/IMessageConsumer.cs?rev=990885&r1=990884&r2=990885&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/IMessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/IMessageConsumer.cs Mon Aug 30 18:04:21 2010
@@ -21,16 +21,16 @@ namespace Apache.NMS
 	/// </summary>
 	public delegate void MessageListener(IMessage message);
 
-    /// <summary>
-    /// A delegate that a client can register that will be called each time a consumer dispatches a message
-    /// to the client code to allow the client to Transform a received message from one type to another,
-    /// StreamMessage to TextMessage, ObjectMessage to TextMessage containing XML, etc.  This allows a
-    /// client to create a consumer that will automatically transform a message to a type that the client is
-    /// capable of processing or adding additional information to a received message.  For messages that do
-    /// not need to be processed the client should return null from this method, in this case the original
-    /// message will be dispatched to the client.
-    /// </summary>
-    public delegate IMessage ConsumerTransformerDelegate(ISession session, IMessageConsumer consumer, IMessage message);
+	/// <summary>
+	/// A delegate that a client can register that will be called each time a consumer dispatches a message
+	/// to the client code to allow the client to Transform a received message from one type to another,
+	/// StreamMessage to TextMessage, ObjectMessage to TextMessage containing XML, etc.  This allows a
+	/// client to create a consumer that will automatically transform a message to a type that the client is
+	/// capable of processing or adding additional information to a received message.  For messages that do
+	/// not need to be processed the client should return null from this method, in this case the original
+	/// message will be dispatched to the client.
+	/// </summary>
+	public delegate IMessage ConsumerTransformerDelegate(ISession session, IMessageConsumer consumer, IMessage message);
 
 	/// <summary>
 	/// A consumer of messages
@@ -41,24 +41,24 @@ namespace Apache.NMS
 		/// Waits until a message is available and returns it
 		/// </summary>
 		IMessage Receive();
-		
+
 		/// <summary>
 		/// If a message is available within the timeout duration it is returned otherwise this method returns null
 		/// </summary>
 		IMessage Receive(System.TimeSpan timeout);
-		
+
 		/// <summary>
 		/// If a message is available immediately it is returned otherwise this method returns null
 		/// </summary>
 		IMessage ReceiveNoWait();
-		
+
 		/// <summary>
 		/// An asynchronous listener which can be used to consume messages asynchronously
 		/// </summary>
 		event MessageListener Listener;
 
 		/// <summary>
-		/// Closes the message consumer. 
+		/// Closes the message consumer.
 		/// </summary>
 		/// <remarks>
 		/// Clients should close message consumers them when they are not needed.
@@ -67,17 +67,13 @@ namespace Apache.NMS
 		/// </remarks>
 		void Close();
 
-        /// <summary>
-        /// A Delegate that is called each time a Message is dispatched to allow the client to do
-        /// any necessary transformations on the received message before it is delivered.
-        /// </summary>
-        ConsumerTransformerDelegate ConsumerTransformer
-        {
-            get;
-            set;
-        }
+		/// <summary>
+		/// A Delegate that is called each time a Message is dispatched to allow the client to do
+		/// any necessary transformations on the received message before it is delivered.
+		/// </summary>
+		ConsumerTransformerDelegate ConsumerTransformer { get; set; }
 
-    }
+	}
 }
 
 

Modified: activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/IMessageProducer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/IMessageProducer.cs?rev=990885&r1=990884&r2=990885&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/IMessageProducer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/IMessageProducer.cs Mon Aug 30 18:04:21 2010
@@ -18,16 +18,16 @@ using System;
 
 namespace Apache.NMS
 {
-    /// <summary>
-    /// A delegate that a client can register that will be called each time a Producer's send method is
-    /// called to allow the client to Transform a sent message from one type to another, StreamMessage to
-    /// TextMessage, ObjectMessage to TextMessage containing XML, etc.  This allows a client to create a
-    /// producer that will automatically transform a message to a type that some receiving client is
-    /// capable of processing or adding additional information to a sent message such as additional message
-    /// headers, etc.  For messages that do not need to be processed the client should return null from
-    /// this method, in this case the original message will be sent.
-    /// </summary>
-    public delegate IMessage ProducerTransformerDelegate(ISession session, IMessageProducer producer, IMessage message);
+	/// <summary>
+	/// A delegate that a client can register that will be called each time a Producer's send method is
+	/// called to allow the client to Transform a sent message from one type to another, StreamMessage to
+	/// TextMessage, ObjectMessage to TextMessage containing XML, etc.  This allows a client to create a
+	/// producer that will automatically transform a message to a type that some receiving client is
+	/// capable of processing or adding additional information to a sent message such as additional message
+	/// headers, etc.  For messages that do not need to be processed the client should return null from
+	/// this method, in this case the original message will be sent.
+	/// </summary>
+	public delegate IMessage ProducerTransformerDelegate(ISession session, IMessageProducer producer, IMessage message);
 
 	/// <summary>
 	/// An object capable of sending messages to some destination
@@ -59,15 +59,11 @@ namespace Apache.NMS
 		/// </summary>
 		void Close();
 
-        /// <summary>
-        /// A delegate that is called each time a Message is sent from this Producer which allows
-        /// the application to perform any needed transformations on the Message before it is sent.
-        /// </summary>
-        ProducerTransformerDelegate ProducerTransformer
-        {
-            get;
-            set;
-        }
+		/// <summary>
+		/// A delegate that is called each time a Message is sent from this Producer which allows
+		/// the application to perform any needed transformations on the Message before it is sent.
+		/// </summary>
+		ProducerTransformerDelegate ProducerTransformer { get; set; }
 
 		MsgDeliveryMode DeliveryMode { get; set; }
 



Mime
View raw message