activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jgo...@apache.org
Subject svn commit: r990885 [1/3] - in /activemq/activemq-dotnet: Apache.NMS.ActiveMQ/trunk/src/main/csharp/ Apache.NMS.EMS/trunk/src/main/csharp/ Apache.NMS/trunk/src/main/csharp/
Date Mon, 30 Aug 2010 18:04:21 GMT
Author: jgomes
Date: Mon Aug 30 18:04:21 2010
New Revision: 990885

URL: http://svn.apache.org/viewvc?rev=990885&view=rev
Log:
Implement enhancements for EMS provider.
Fixes [AMQNET-271]. (See https://issues.apache.org/activemq/browse/AMQNET-271)

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
    activemq/activemq-dotnet/Apache.NMS.EMS/trunk/src/main/csharp/Connection.cs
    activemq/activemq-dotnet/Apache.NMS.EMS/trunk/src/main/csharp/ConnectionFactory.cs
    activemq/activemq-dotnet/Apache.NMS.EMS/trunk/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.EMS/trunk/src/main/csharp/MessageProducer.cs
    activemq/activemq-dotnet/Apache.NMS.EMS/trunk/src/main/csharp/Session.cs
    activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/IConnection.cs
    activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/IConnectionFactory.cs
    activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/IMessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/IMessageProducer.cs
    activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/ISession.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs?rev=990885&r1=990884&r2=990885&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs Mon Aug 30 18:04:21 2010
@@ -39,25 +39,25 @@ namespace Apache.NMS.ActiveMQ
 		private Uri brokerUri;
 		private string connectionUserName;
 		private string connectionPassword;
-        private string clientId;
-        private string clientIdPrefix;
-        private IdGenerator clientIdGenerator;
-
-        private bool useCompression;
-        private bool copyMessageOnSend = true;
-        private bool dispatchAsync = true;
-        private bool asyncSend;
-        private bool asyncClose;
-        private bool alwaysSyncSend;
-        private bool sendAcksAsync = true;
+		private string clientId;
+		private string clientIdPrefix;
+		private IdGenerator clientIdGenerator;
+
+		private bool useCompression;
+		private bool copyMessageOnSend = true;
+		private bool dispatchAsync = true;
+		private bool asyncSend;
+		private bool asyncClose;
+		private bool alwaysSyncSend;
+		private bool sendAcksAsync = true;
 		private int producerWindowSize = 0;
-        private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
+		private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
 		private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout;
-        private bool messagePrioritySupported=true;
+		private bool messagePrioritySupported=true;
 
-        private IRedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
-        private PrefetchPolicy prefetchPolicy = new PrefetchPolicy();
-        private ICompressionPolicy compressionPolicy = new CompressionPolicy();
+		private IRedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
+		private PrefetchPolicy prefetchPolicy = new PrefetchPolicy();
+		private ICompressionPolicy compressionPolicy = new CompressionPolicy();
 
 		static ConnectionFactory()
 		{
@@ -106,57 +106,57 @@ namespace Apache.NMS.ActiveMQ
 
 		public IConnection CreateConnection(string userName, string password)
 		{
-            Connection connection = null;
+			Connection connection = null;
 
-            try
-            {
-    			Tracer.InfoFormat("Connecting to: {0}", brokerUri.ToString());
-    
-                ITransport transport = TransportFactory.CreateTransport(brokerUri);
-    
-                connection = new Connection(brokerUri, transport, this.ClientIdGenerator);
-    
-                ConfigureConnection(connection);
-    
-                connection.UserName = userName;
-                connection.Password = password;
-    
-                if(this.clientId != null)
-                {
-                    connection.DefaultClientId = this.clientId;
-                }
-    
-    			connection.ITransport.Start();
-    
-    			return connection;
-            }
-            catch(NMSException e)
-            {
-                try
-                {
-                    connection.Close();
-                }
-                catch
-                {
-                }
-
-                throw e;
-            }
-            catch(Exception e)
-            {
-                try
-                {
-                    connection.Close();
-                }
-                catch
-                {
-                }
+			try
+			{
+				Tracer.InfoFormat("Connecting to: {0}", brokerUri.ToString());
+
+				ITransport transport = TransportFactory.CreateTransport(brokerUri);
+
+				connection = new Connection(brokerUri, transport, this.ClientIdGenerator);
+
+				ConfigureConnection(connection);
 
-                throw NMSExceptionSupport.Create("Could not connect to broker URL: " + this.brokerUri + ". Reason: " + e.Message, e);
-            }
+				connection.UserName = userName;
+				connection.Password = password;
+
+				if(this.clientId != null)
+				{
+					connection.DefaultClientId = this.clientId;
+				}
+
+				connection.ITransport.Start();
+
+				return connection;
+			}
+			catch(NMSException e)
+			{
+				try
+				{
+					connection.Close();
+				}
+				catch
+				{
+				}
+
+				throw e;
+			}
+			catch(Exception e)
+			{
+				try
+				{
+					connection.Close();
+				}
+				catch
+				{
+				}
+
+				throw NMSExceptionSupport.Create("Could not connect to broker URL: " + this.brokerUri + ". Reason: " + e.Message, e);
+			}
 		}
 
-        #region ConnectionFactory Properties
+		#region ConnectionFactory Properties
 
 		/// <summary>
 		/// Get/or set the broker Uri.
@@ -165,30 +165,30 @@ namespace Apache.NMS.ActiveMQ
 		{
 			get { return brokerUri; }
 			set
-            {
-                brokerUri = new Uri(URISupport.StripPrefix(value.OriginalString, "activemq:"));
+			{
+				brokerUri = new Uri(URISupport.StripPrefix(value.OriginalString, "activemq:"));
 
-                if(brokerUri.Query != null)
-                {
-                    StringDictionary properties = URISupport.ParseQuery(brokerUri.Query);
-				
-    				StringDictionary connection = URISupport.ExtractProperties(properties, "connection.");
-    				StringDictionary nms = URISupport.ExtractProperties(properties, "nms.");
-    				
-    				if(connection != null)
-    				{
-                    	URISupport.SetProperties(this, connection, "connection.");
-    				}
-    				
-    				if(nms != null)
-    				{
-                    	URISupport.SetProperties(this.PrefetchPolicy, nms, "nms.PrefetchPolicy.");
-                    	URISupport.SetProperties(this.RedeliveryPolicy, nms, "nms.RedeliveryPolicy.");
-    				}
-
-                    brokerUri = URISupport.CreateRemainingUri(brokerUri, properties);
-                }
-            }
+				if(brokerUri.Query != null)
+				{
+					StringDictionary properties = URISupport.ParseQuery(brokerUri.Query);
+
+					StringDictionary connection = URISupport.ExtractProperties(properties, "connection.");
+					StringDictionary nms = URISupport.ExtractProperties(properties, "nms.");
+
+					if(connection != null)
+					{
+						URISupport.SetProperties(this, connection, "connection.");
+					}
+
+					if(nms != null)
+					{
+						URISupport.SetProperties(this.PrefetchPolicy, nms, "nms.PrefetchPolicy.");
+						URISupport.SetProperties(this.RedeliveryPolicy, nms, "nms.RedeliveryPolicy.");
+					}
+
+					brokerUri = URISupport.CreateRemainingUri(brokerUri, properties);
+				}
+			}
 		}
 
 		public string UserName
@@ -209,59 +209,59 @@ namespace Apache.NMS.ActiveMQ
 			set { clientId = value; }
 		}
 
-        public string ClientIdPrefix
-        {
-            get { return clientIdPrefix; }
-            set { clientIdPrefix = value; }
-        }
-
-        public bool UseCompression
-        {
-            get { return this.useCompression; }
-            set { this.useCompression = value; }
-        }
-
-        public bool CopyMessageOnSend
-        {
-            get { return copyMessageOnSend; }
-            set { copyMessageOnSend = value; }
-        }
-
-        public bool AlwaysSyncSend
-        {
-            get { return alwaysSyncSend; }
-            set { alwaysSyncSend = value; }
-        }
-
-        public bool AsyncClose
-        {
-            get { return asyncClose; }
-            set { asyncClose = value; }
-        }
-
-        public bool SendAcksAsync
-        {
-            get { return sendAcksAsync; }
-            set { sendAcksAsync = value; }
-        }
-
-        public bool AsyncSend
-        {
-            get { return asyncSend; }
-            set { asyncSend = value; }
-        }
-
-        public bool DispatchAsync
-        {
-            get { return this.dispatchAsync; }
-            set { this.dispatchAsync = value; }
-        }
-
-        public bool MessagePrioritySupported
-        {
-            get { return this.messagePrioritySupported; }
-            set { this.messagePrioritySupported = value; }
-        }
+		public string ClientIdPrefix
+		{
+			get { return clientIdPrefix; }
+			set { clientIdPrefix = value; }
+		}
+
+		public bool UseCompression
+		{
+			get { return this.useCompression; }
+			set { this.useCompression = value; }
+		}
+
+		public bool CopyMessageOnSend
+		{
+			get { return copyMessageOnSend; }
+			set { copyMessageOnSend = value; }
+		}
+
+		public bool AlwaysSyncSend
+		{
+			get { return alwaysSyncSend; }
+			set { alwaysSyncSend = value; }
+		}
+
+		public bool AsyncClose
+		{
+			get { return asyncClose; }
+			set { asyncClose = value; }
+		}
+
+		public bool SendAcksAsync
+		{
+			get { return sendAcksAsync; }
+			set { sendAcksAsync = value; }
+		}
+
+		public bool AsyncSend
+		{
+			get { return asyncSend; }
+			set { asyncSend = value; }
+		}
+
+		public bool DispatchAsync
+		{
+			get { return this.dispatchAsync; }
+			set { this.dispatchAsync = value; }
+		}
+
+		public bool MessagePrioritySupported
+		{
+			get { return this.messagePrioritySupported; }
+			set { this.messagePrioritySupported = value; }
+		}
 
 		public int RequestTimeout
 		{
@@ -269,76 +269,76 @@ namespace Apache.NMS.ActiveMQ
 			set { this.requestTimeout = TimeSpan.FromMilliseconds(value); }
 		}
 
-        public string AckMode
-        {
-            set { this.acknowledgementMode = NMSConvert.ToAcknowledgementMode(value); }
-        }
-
-        public AcknowledgementMode AcknowledgementMode
-        {
-            get { return acknowledgementMode; }
-            set { this.acknowledgementMode = value; }
-        }
-
-        public int ProducerWindowSize
-        {
-            get { return producerWindowSize; }
-            set { producerWindowSize = value; }
-        }
-		
-        public PrefetchPolicy PrefetchPolicy
-        {
-            get { return this.prefetchPolicy; }
-            set { this.prefetchPolicy = value; }
-        }
-
-        public IRedeliveryPolicy RedeliveryPolicy
-        {
-            get { return this.redeliveryPolicy; }
-            set 
-            {
-                if(value != null)
-                {
-                    this.redeliveryPolicy = value; 
-                }
-            }
-        }
-
-        public ICompressionPolicy CompressionPolicy
-        {
-            get { return this.compressionPolicy; }
-            set 
-            {
-                if(value != null)
-                {
-                    this.compressionPolicy = value; 
-                }
-            }
-        }
-
-        public IdGenerator ClientIdGenerator
-        {
-            set { this.clientIdGenerator = value; }
-            get
-            {
-                lock(this)
-                {
-                    if(this.clientIdGenerator == null)
-                    {
-                        if(this.clientIdPrefix != null)
-                        {
-                            this.clientIdGenerator = new IdGenerator(this.clientIdPrefix);
-                        }
-                        else
-                        {
-                            this.clientIdGenerator = new IdGenerator();
-                        }
-                    }
-
-                    return this.clientIdGenerator;
-                }
-            }
-        }
+		public string AckMode
+		{
+			set { this.acknowledgementMode = NMSConvert.ToAcknowledgementMode(value); }
+		}
+
+		public AcknowledgementMode AcknowledgementMode
+		{
+			get { return acknowledgementMode; }
+			set { this.acknowledgementMode = value; }
+		}
+
+		public int ProducerWindowSize
+		{
+			get { return producerWindowSize; }
+			set { producerWindowSize = value; }
+		}
+
+		public PrefetchPolicy PrefetchPolicy
+		{
+			get { return this.prefetchPolicy; }
+			set { this.prefetchPolicy = value; }
+		}
+
+		public IRedeliveryPolicy RedeliveryPolicy
+		{
+			get { return this.redeliveryPolicy; }
+			set
+			{
+				if(value != null)
+				{
+					this.redeliveryPolicy = value;
+				}
+			}
+		}
+
+		public ICompressionPolicy CompressionPolicy
+		{
+			get { return this.compressionPolicy; }
+			set
+			{
+				if(value != null)
+				{
+					this.compressionPolicy = value;
+				}
+			}
+		}
+
+		public IdGenerator ClientIdGenerator
+		{
+			set { this.clientIdGenerator = value; }
+			get
+			{
+				lock(this)
+				{
+					if(this.clientIdGenerator == null)
+					{
+						if(this.clientIdPrefix != null)
+						{
+							this.clientIdGenerator = new IdGenerator(this.clientIdPrefix);
+						}
+						else
+						{
+							this.clientIdGenerator = new IdGenerator();
+						}
+					}
+
+					return this.clientIdGenerator;
+				}
+			}
+		}
 
 		public event ExceptionListener OnException
 		{
@@ -353,6 +353,13 @@ namespace Apache.NMS.ActiveMQ
 		}
 
 		private ConsumerTransformerDelegate consumerTransformer;
+		/// <summary>
+		/// A Delegate that is called each time a Message is dispatched to allow the client to do
+		/// any necessary transformations on the received message before it is delivered.  The
+		/// ConnectionFactory sets the provided delegate instance on each Connection instance that
+		/// is created from this factory, each connection in turn passes the delegate along to each
+		/// Session it creates which then passes that along to the Consumers it creates.
+		/// </summary>
 		public ConsumerTransformerDelegate ConsumerTransformer
 		{
 			get { return this.consumerTransformer; }
@@ -360,33 +367,40 @@ namespace Apache.NMS.ActiveMQ
 		}
 
 		private ProducerTransformerDelegate producerTransformer;
+		/// <summary>
+		/// A delegate that is called each time a Message is sent from this Producer which allows
+		/// the application to perform any needed transformations on the Message before it is sent.
+		/// The ConnectionFactory sets the provided delegate instance on each Connection instance that
+		/// is created from this factory, each connection in turn passes the delegate along to each
+		/// Session it creates which then passes that along to the Producers it creates.
+		/// </summary>
 		public ProducerTransformerDelegate ProducerTransformer
 		{
 			get { return this.producerTransformer; }
 			set { this.producerTransformer = value; }
 		}
-		
-        #endregion
 
-        protected virtual void ConfigureConnection(Connection connection)
-        {
-            connection.AsyncClose = this.AsyncClose;
-            connection.AsyncSend = this.AsyncSend;
-            connection.CopyMessageOnSend = this.CopyMessageOnSend;
-            connection.AlwaysSyncSend = this.AlwaysSyncSend;
-            connection.DispatchAsync = this.DispatchAsync;
-            connection.SendAcksAsync = this.SendAcksAsync;
-            connection.AcknowledgementMode = this.acknowledgementMode;
-            connection.UseCompression = this.useCompression;
+		#endregion
+
+		protected virtual void ConfigureConnection(Connection connection)
+		{
+			connection.AsyncClose = this.AsyncClose;
+			connection.AsyncSend = this.AsyncSend;
+			connection.CopyMessageOnSend = this.CopyMessageOnSend;
+			connection.AlwaysSyncSend = this.AlwaysSyncSend;
+			connection.DispatchAsync = this.DispatchAsync;
+			connection.SendAcksAsync = this.SendAcksAsync;
+			connection.AcknowledgementMode = this.acknowledgementMode;
+			connection.UseCompression = this.useCompression;
 			connection.RequestTimeout = this.requestTimeout;
 			connection.ProducerWindowSize = this.producerWindowSize;
-            connection.MessagePrioritySupported = this.messagePrioritySupported;
-            connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as IRedeliveryPolicy;
-            connection.PrefetchPolicy = this.prefetchPolicy.Clone() as PrefetchPolicy;
-            connection.CompressionPolicy = this.compressionPolicy.Clone() as ICompressionPolicy;
+			connection.MessagePrioritySupported = this.messagePrioritySupported;
+			connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as IRedeliveryPolicy;
+			connection.PrefetchPolicy = this.prefetchPolicy.Clone() as PrefetchPolicy;
+			connection.CompressionPolicy = this.compressionPolicy.Clone() as ICompressionPolicy;
 			connection.ConsumerTransformer = this.consumerTransformer;
 			connection.ProducerTransformer = this.producerTransformer;
-        }
+		}
 
 		protected static void ExceptionHandler(Exception ex)
 		{

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=990885&r1=990884&r2=990885&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs Mon Aug 30 18:04:21 2010
@@ -40,7 +40,7 @@ namespace Apache.NMS.ActiveMQ
 	/// </summary>
 	public class MessageConsumer : IMessageConsumer, IDispatcher
 	{
-        private readonly MessageTransformation messageTransformation;
+		private readonly MessageTransformation messageTransformation;
 		private readonly MessageDispatchChannel unconsumedMessages;
 		private readonly LinkedList<MessageDispatch> dispatchedMessages = new LinkedList<MessageDispatch>();
 		private readonly ConsumerInfo info;
@@ -60,7 +60,7 @@ namespace Apache.NMS.ActiveMQ
 		private int dispatchedCount = 0;
 		private volatile bool synchronizationRegistered = false;
 		private bool clearDispatchList = false;
-        private bool inProgressClearRequiredFlag;
+		private bool inProgressClearRequiredFlag;
 
 		private const int DEFAULT_REDELIVERY_DELAY = 0;
 		private const int DEFAULT_MAX_REDELIVERIES = 5;
@@ -70,46 +70,46 @@ namespace Apache.NMS.ActiveMQ
 		private IRedeliveryPolicy redeliveryPolicy;
 
 		// Constructor internal to prevent clients from creating an instance.
-		internal MessageConsumer(Session session, ConsumerId id, ActiveMQDestination destination, 
-                                 String name, String selector, int prefetch, int maxPendingMessageCount, 
-                                 bool noLocal, bool browser, bool dispatchAsync )
-		{
-            if(destination == null)
-            {
-                throw new InvalidDestinationException("Consumer cannot receive on Null Destinations.");
-            }
-            
+		internal MessageConsumer(Session session, ConsumerId id, ActiveMQDestination destination,
+								 String name, String selector, int prefetch, int maxPendingMessageCount,
+								 bool noLocal, bool browser, bool dispatchAsync )
+		{
+			if(destination == null)
+			{
+				throw new InvalidDestinationException("Consumer cannot receive on Null Destinations.");
+			}
+
 			this.session = session;
-            this.redeliveryPolicy = this.session.Connection.RedeliveryPolicy;
-            this.messageTransformation = this.session.Connection.MessageTransformation;
+			this.redeliveryPolicy = this.session.Connection.RedeliveryPolicy;
+			this.messageTransformation = this.session.Connection.MessageTransformation;
 
-            if(session.Connection.MessagePrioritySupported)
-            {
-                this.unconsumedMessages = new SimplePriorityMessageDispatchChannel();
-            }
-            else
-            {
-                this.unconsumedMessages = new FifoMessageDispatchChannel();
-            }
-
-            this.info = new ConsumerInfo();
-            this.info.ConsumerId = id;
-            this.info.Destination = destination;
-            this.info.SubscriptionName = name;
-            this.info.Selector = selector;
-            this.info.PrefetchSize = prefetch;
-            this.info.MaximumPendingMessageLimit = maxPendingMessageCount;
-            this.info.NoLocal = noLocal;
-            this.info.Browser = browser;
-            this.info.DispatchAsync = dispatchAsync;
-            this.info.Retroactive = session.Retroactive;
-            this.info.Exclusive = session.Exclusive;
-            this.info.Priority = session.Priority;
-            
-            // If the destination contained a URI query, then use it to set public properties
-            // on the ConsumerInfo
-            if(destination.Options != null)
-            {
+			if(session.Connection.MessagePrioritySupported)
+			{
+				this.unconsumedMessages = new SimplePriorityMessageDispatchChannel();
+			}
+			else
+			{
+				this.unconsumedMessages = new FifoMessageDispatchChannel();
+			}
+
+			this.info = new ConsumerInfo();
+			this.info.ConsumerId = id;
+			this.info.Destination = destination;
+			this.info.SubscriptionName = name;
+			this.info.Selector = selector;
+			this.info.PrefetchSize = prefetch;
+			this.info.MaximumPendingMessageLimit = maxPendingMessageCount;
+			this.info.NoLocal = noLocal;
+			this.info.Browser = browser;
+			this.info.DispatchAsync = dispatchAsync;
+			this.info.Retroactive = session.Retroactive;
+			this.info.Exclusive = session.Exclusive;
+			this.info.Priority = session.Priority;
+
+			// If the destination contained a URI query, then use it to set public properties
+			// on the ConsumerInfo
+			if(destination.Options != null)
+			{
 				// Get options prefixed with "consumer.*"
 				StringDictionary options = URISupport.GetProperties(destination.Options, "consumer.");
 				// Extract out custom extension options "consumer.nms.*"
@@ -117,7 +117,7 @@ namespace Apache.NMS.ActiveMQ
 
 				URISupport.SetProperties(this.info, options);
 				URISupport.SetProperties(this, customConsumerOptions, "nms.");
-            }
+			}
 		}
 
 		~MessageConsumer()
@@ -137,10 +137,10 @@ namespace Apache.NMS.ActiveMQ
 			get { return this.info.ConsumerId; }
 		}
 
-        public ConsumerInfo ConsumerInfo
-        {
-            get { return this.info; }
-        }
+		public ConsumerInfo ConsumerInfo
+		{
+			get { return this.info; }
+		}
 
 		public int RedeliveryTimeout
 		{
@@ -158,11 +158,11 @@ namespace Apache.NMS.ActiveMQ
 			get { return this.redeliveryPolicy; }
 			set { this.redeliveryPolicy = value; }
 		}
-        
-        public long UnconsumedMessageCount
-        {
-            get { return this.unconsumedMessages.Count; }
-        }
+
+		public long UnconsumedMessageCount
+		{
+			get { return this.unconsumedMessages.Count; }
+		}
 
 		// Custom Options
 		private bool ignoreExpiration = false;
@@ -171,17 +171,21 @@ namespace Apache.NMS.ActiveMQ
 			get { return ignoreExpiration; }
 			set { ignoreExpiration = value; }
 		}
-		
+
 		#endregion
 
 		#region IMessageConsumer Members
 
-        private ConsumerTransformerDelegate consumerTransformer;
-        public ConsumerTransformerDelegate ConsumerTransformer
-        {
-            get { return this.consumerTransformer; }
-            set { this.consumerTransformer = value; }
-        }
+		private ConsumerTransformerDelegate consumerTransformer;
+		/// <summary>
+		/// A Delegate that is called each time a Message is dispatched to allow the client to do
+		/// any necessary transformations on the received message before it is delivered.
+		/// </summary>
+		public ConsumerTransformerDelegate ConsumerTransformer
+		{
+			get { return this.consumerTransformer; }
+			set { this.consumerTransformer = value; }
+		}
 
 		public event MessageListener Listener
 		{
@@ -336,8 +340,8 @@ namespace Apache.NMS.ActiveMQ
 		{
 			if(!this.unconsumedMessages.Closed)
 			{
-                Tracer.Debug("Closing down the Consumer");
-                
+				Tracer.Debug("Closing down the Consumer");
+
 				// Do we have any acks we need to send out before closing?
 				// Ack any delivered messages now.
 				if(!this.session.IsTransacted)
@@ -351,7 +355,7 @@ namespace Apache.NMS.ActiveMQ
 
 				if(!this.session.IsTransacted)
 				{
-                    lock(this.dispatchedMessages)
+					lock(this.dispatchedMessages)
 					{
 						dispatchedMessages.Clear();
 					}
@@ -360,15 +364,15 @@ namespace Apache.NMS.ActiveMQ
 				this.unconsumedMessages.Close();
 				this.session.RemoveConsumer(this.info.ConsumerId);
 
-                RemoveInfo removeCommand = new RemoveInfo();
+				RemoveInfo removeCommand = new RemoveInfo();
 				removeCommand.ObjectId = this.info.ConsumerId;
 				removeCommand.LastDeliveredSequenceId = this.lastDeliveredSequenceId;
 
 				this.session.Connection.Oneway(removeCommand);
 				this.session = null;
 
-                Tracer.Debug("Consumer instance Closed.");
-            }
+				Tracer.Debug("Consumer instance Closed.");
+			}
 		}
 
 		#endregion
@@ -383,11 +387,11 @@ namespace Apache.NMS.ActiveMQ
 				messagePull.Timeout = timeout;
 				messagePull.ResponseRequired = false;
 
-                if(Tracer.IsDebugEnabled)
-                {                    
-				    Tracer.Debug("Sending MessagePull: " + messagePull);
-                }
-                
+				if(Tracer.IsDebugEnabled)
+				{
+					Tracer.Debug("Sending MessagePull: " + messagePull);
+				}
+
 				session.Connection.Oneway(messagePull);
 			}
 		}
@@ -408,8 +412,8 @@ namespace Apache.NMS.ActiveMQ
 					}
 				}
 			}
-			
-			if(dispatch == null) 
+
+			if(dispatch == null)
 			{
 				Tracer.DebugFormat("Attempt to Ack MessageId[{0}] failed because the original dispatch is not in the Dispatch List", message.MessageId);
 				return;
@@ -456,38 +460,38 @@ namespace Apache.NMS.ActiveMQ
 			this.unconsumedMessages.Stop();
 		}
 
-        internal void InProgressClearRequired()
-        {
-            inProgressClearRequiredFlag = true;
-            // deal with delivered messages async to avoid lock contention with in progress acks
-            clearDispatchList = true;
-        }
-
-        internal void ClearMessagesInProgress()
-        {
-            if(inProgressClearRequiredFlag)
-            {
-                // Called from a thread in the ThreadPool, so we wait until we can
-                // get a lock on the unconsumed list then we clear it.
-                lock(this.unconsumedMessages)
-                {
-                    if(inProgressClearRequiredFlag)
-                    {
-                        if(Tracer.IsDebugEnabled)
-                        {
-                            Tracer.Debug(this.ConsumerId + " clearing dispatched list (" +
-                                         this.unconsumedMessages.Count + ") on transport interrupt");
-                        }
-
-                        this.unconsumedMessages.Clear();
-
-                        // allow dispatch on this connection to resume
-                        this.session.Connection.TransportInterruptionProcessingComplete();
-                        this.inProgressClearRequiredFlag = false;
-                    }
-                }
-            }
-        }
+		internal void InProgressClearRequired()
+		{
+			inProgressClearRequiredFlag = true;
+			// deal with delivered messages async to avoid lock contention with in progress acks
+			clearDispatchList = true;
+		}
+
+		internal void ClearMessagesInProgress()
+		{
+			if(inProgressClearRequiredFlag)
+			{
+				// Called from a thread in the ThreadPool, so we wait until we can
+				// get a lock on the unconsumed list then we clear it.
+				lock(this.unconsumedMessages)
+				{
+					if(inProgressClearRequiredFlag)
+					{
+						if(Tracer.IsDebugEnabled)
+						{
+							Tracer.Debug(this.ConsumerId + " clearing dispatched list (" +
+										 this.unconsumedMessages.Count + ") on transport interrupt");
+						}
+
+						this.unconsumedMessages.Clear();
+
+						// allow dispatch on this connection to resume
+						this.session.Connection.TransportInterruptionProcessingComplete();
+						this.inProgressClearRequiredFlag = false;
+					}
+				}
+			}
+		}
 
 		public void DeliverAcks()
 		{
@@ -555,10 +559,10 @@ namespace Apache.NMS.ActiveMQ
 						{
 							// on resumption a pending delivered ack will be out of sync with
 							// re-deliveries.
-                            if(Tracer.IsDebugEnabled)
-                            {
-							    Tracer.Debug("removing pending delivered ack on transport interupt: " + pendingAck);
-                            }
+							if(Tracer.IsDebugEnabled)
+							{
+								Tracer.Debug("removing pending delivered ack on transport interupt: " + pendingAck);
+							}
 							this.pendingAck = null;
 						}
 					}
@@ -596,13 +600,13 @@ namespace Apache.NMS.ActiveMQ
 
 								Tracer.Error(this.info.ConsumerId + " Exception while processing message: " + e);
 
-                                // If aborted we stop the abort here and let normal processing resume.
-                                // This allows the session to shutdown normally and ack all messages
-                                // that have outstanding acks in this consumer.
-                                if( (Thread.CurrentThread.ThreadState & ThreadState.AbortRequested) == ThreadState.AbortRequested)
-                                {
-                                    Thread.ResetAbort();
-                                }
+								// If aborted we stop the abort here and let normal processing resume.
+								// This allows the session to shutdown normally and ack all messages
+								// that have outstanding acks in this consumer.
+								if( (Thread.CurrentThread.ThreadState & ThreadState.AbortRequested) == ThreadState.AbortRequested)
+								{
+									Thread.ResetAbort();
+								}
 							}
 						}
 						else
@@ -653,9 +657,9 @@ namespace Apache.NMS.ActiveMQ
 		/// <summary>
 		/// Used to get an enqueued message from the unconsumedMessages list. The
 		/// amount of time this method blocks is based on the timeout value.  if
-		/// timeout == Timeout.Infinite then it blocks until a message is received. 
-		/// if timeout == 0 then it it tries to not block at all, it returns a 
-		/// message if it is available if timeout > 0 then it blocks up to timeout 
+		/// timeout == Timeout.Infinite then it blocks until a message is received.
+		/// if timeout == 0 then it it tries to not block at all, it returns a
+		/// message if it is available if timeout > 0 then it blocks up to timeout
 		/// amount of time.  Expired messages will consumed by this method.
 		/// </summary>
 		/// <param name="timeout">
@@ -800,14 +804,14 @@ namespace Apache.NMS.ActiveMQ
 				else if(IsClientAcknowledge || IsIndividualAcknowledge)
 				{
 					bool messageAckedByConsumer = false;
-					
+
 					lock(this.dispatchedMessages)
 					{
 						messageAckedByConsumer = this.dispatchedMessages.Contains(dispatch);
 					}
-					
+
 					if(messageAckedByConsumer)
-					{					
+					{
 						AckLater(dispatch, AckType.DeliveredAck);
 					}
 				}
@@ -886,19 +890,19 @@ namespace Apache.NMS.ActiveMQ
 				// ack and hence important, send it now so it is not lost.
 				if(oldPendingAck.AckType != (byte) AckType.DeliveredAck)
 				{
-                    if(Tracer.IsDebugEnabled)
-                    {
-					    Tracer.Debug("Sending old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
-                    }
+					if(Tracer.IsDebugEnabled)
+					{
+						Tracer.Debug("Sending old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
+					}
 
-                    this.session.Connection.Oneway(oldPendingAck);
+					this.session.Connection.Oneway(oldPendingAck);
 				}
 				else
 				{
-                    if(Tracer.IsDebugEnabled)
-                    {
-					    Tracer.Debug("dropping old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
-                    }
+					if(Tracer.IsDebugEnabled)
+					{
+						Tracer.Debug("dropping old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
+					}
 				}
 			}
 
@@ -1081,7 +1085,7 @@ namespace Apache.NMS.ActiveMQ
 					message = this.messageTransformation.TransformMessage<ActiveMQMessage>(newMessage);
 				}
 			}
-			
+
 			message.Connection = this.session.Connection;
 
 			if(IsClientAcknowledge)

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs?rev=990885&r1=990884&r2=990885&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs Mon Aug 30 18:04:21 2010
@@ -23,185 +23,185 @@ using Apache.NMS.ActiveMQ.Util;
 
 namespace Apache.NMS.ActiveMQ
 {
-    /// <summary>
-    /// An object capable of sending messages to some destination
-    /// </summary>
-    public class MessageProducer : IMessageProducer
-    {
-        private Session session;
-        private MemoryUsage usage = null;
-        private bool closed = false;
-        private object closedLock = new object();
-        private readonly ProducerInfo info;
-        private int producerSequenceId = 0;
-
-        private MsgDeliveryMode msgDeliveryMode = NMSConstants.defaultDeliveryMode;
-        private TimeSpan requestTimeout;
-        private TimeSpan msgTimeToLive = NMSConstants.defaultTimeToLive;
-        private MsgPriority msgPriority = NMSConstants.defaultPriority - 1;
-        private bool disableMessageID = false;
-        private bool disableMessageTimestamp = false;
-        protected bool disposed = false;
-
-        private MessageTransformation messageTransformation;
-
-        public MessageProducer(Session session, ProducerId id, ActiveMQDestination destination, TimeSpan requestTimeout)
-        {
-            this.session = session;            
-            this.RequestTimeout = requestTimeout;
-
-            this.info = new ProducerInfo();
-            this.info.ProducerId = id;
-            this.info.Destination = destination;
-            this.info.WindowSize = session.Connection.ProducerWindowSize;
-
-            this.messageTransformation = session.Connection.MessageTransformation;
-            
-            // If the destination contained a URI query, then use it to set public
-            // properties on the ProducerInfo
-            if(destination != null && destination.Options != null)
-            {
-                URISupport.SetProperties(this.info, destination.Options, "producer.");
-            }
-            
-            // Version Three and higher will send us a ProducerAck, but only if we
-            // have a set producer window size.
-            if(session.Connection.ProtocolVersion >= 3 && this.info.WindowSize > 0)
-            {
+	/// <summary>
+	/// An object capable of sending messages to some destination
+	/// </summary>
+	public class MessageProducer : IMessageProducer
+	{
+		private Session session;
+		private MemoryUsage usage = null;
+		private bool closed = false;
+		private object closedLock = new object();
+		private readonly ProducerInfo info;
+		private int producerSequenceId = 0;
+
+		private MsgDeliveryMode msgDeliveryMode = NMSConstants.defaultDeliveryMode;
+		private TimeSpan requestTimeout;
+		private TimeSpan msgTimeToLive = NMSConstants.defaultTimeToLive;
+		private MsgPriority msgPriority = NMSConstants.defaultPriority - 1;
+		private bool disableMessageID = false;
+		private bool disableMessageTimestamp = false;
+		protected bool disposed = false;
+
+		private MessageTransformation messageTransformation;
+
+		public MessageProducer(Session session, ProducerId id, ActiveMQDestination destination, TimeSpan requestTimeout)
+		{
+			this.session = session;
+			this.RequestTimeout = requestTimeout;
+
+			this.info = new ProducerInfo();
+			this.info.ProducerId = id;
+			this.info.Destination = destination;
+			this.info.WindowSize = session.Connection.ProducerWindowSize;
+
+			this.messageTransformation = session.Connection.MessageTransformation;
+
+			// If the destination contained a URI query, then use it to set public
+			// properties on the ProducerInfo
+			if(destination != null && destination.Options != null)
+			{
+				URISupport.SetProperties(this.info, destination.Options, "producer.");
+			}
+
+			// Version Three and higher will send us a ProducerAck, but only if we
+			// have a set producer window size.
+			if(session.Connection.ProtocolVersion >= 3 && this.info.WindowSize > 0)
+			{
 				Tracer.Debug("MessageProducer created with a Window Size of: " + this.info.WindowSize);
-                this.usage = new MemoryUsage(this.info.WindowSize);
-            }
-        }
-
-        ~MessageProducer()
-        {
-            Dispose(false);
-        }
-
-        public void Dispose()
-        {
-            Dispose(true);
-            GC.SuppressFinalize(this);
-        }
-
-        protected void Dispose(bool disposing)
-        {
-            if(disposed)
-            {
-                return;
-            }
-
-            if(disposing)
-            {
-                // Dispose managed code here.
-            }
-
-            try
-            {
-                Close();
-            }
-            catch
-            {
-                // Ignore network errors.
-            }
-
-            disposed = true;
-        }
-
-        public void Close()
-        {
-            lock(closedLock)
-            {
-                if(closed)
-                {
-                    return;
-                }
-
-                DoClose();
-                RemoveInfo removeInfo = new RemoveInfo();
-                removeInfo.ObjectId = this.info.ProducerId;
-                this.session.Connection.Oneway(removeInfo);
-                this.session = null;
-            }
-        }
-
-        internal void DoClose()
-        {
-            lock(closedLock)
-            {
-                if(closed)
-                {
-                    return;
-                }
-
-                try
-                {
-                    session.RemoveProducer(info.ProducerId);
-                }
-                catch(Exception ex)
-                {
-                    Tracer.ErrorFormat("Error during producer close: {0}", ex);
-                }
-
-                if(this.usage != null)
-                {
-                    this.usage.Stop();
-                }
-
-                closed = true;
-            }
-        }
-
-        public void Send(IMessage message)
-        {
-            Send(info.Destination, message, this.msgDeliveryMode, this.msgPriority, this.msgTimeToLive, false);
-        }
-
-        public void Send(IDestination destination, IMessage message)
-        {
-            Send(destination, message, this.msgDeliveryMode, this.msgPriority, this.msgTimeToLive, false);
-        }
-
-        public void Send(IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
-        {
-            Send(info.Destination, message, deliveryMode, priority, timeToLive, true);
-        }
-
-        public void Send(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
-        {
-            Send(destination, message, deliveryMode, priority, timeToLive, true);
-        }
-
-        protected void Send(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive, bool specifiedTimeToLive)
-        {
-            if(null == destination)
-            {
-                // See if this producer was created without a destination.
-                if(null == info.Destination)
-                {
-                    throw new NotSupportedException();
-                }
-
-                // The producer was created with a destination, but an invalid destination
-                // was specified.
-                throw new Apache.NMS.InvalidDestinationException();
-            }
-
-            ActiveMQDestination dest = null;
-
-            if(destination == this.info.Destination)
-            {
-                dest = destination as ActiveMQDestination;
-            }
-            else if(info.Destination == null)
-            {
-                dest = ActiveMQDestination.Transform(destination);
-            }
-            else
-            {
-                throw new NotSupportedException("This producer can only send messages to: " + this.info.Destination.PhysicalName);
-            }
-			
+				this.usage = new MemoryUsage(this.info.WindowSize);
+			}
+		}
+
+		~MessageProducer()
+		{
+			Dispose(false);
+		}
+
+		public void Dispose()
+		{
+			Dispose(true);
+			GC.SuppressFinalize(this);
+		}
+
+		protected void Dispose(bool disposing)
+		{
+			if(disposed)
+			{
+				return;
+			}
+
+			if(disposing)
+			{
+				// Dispose managed code here.
+			}
+
+			try
+			{
+				Close();
+			}
+			catch
+			{
+				// Ignore network errors.
+			}
+
+			disposed = true;
+		}
+
+		public void Close()
+		{
+			lock(closedLock)
+			{
+				if(closed)
+				{
+					return;
+				}
+
+				DoClose();
+				RemoveInfo removeInfo = new RemoveInfo();
+				removeInfo.ObjectId = this.info.ProducerId;
+				this.session.Connection.Oneway(removeInfo);
+				this.session = null;
+			}
+		}
+
+		internal void DoClose()
+		{
+			lock(closedLock)
+			{
+				if(closed)
+				{
+					return;
+				}
+
+				try
+				{
+					session.RemoveProducer(info.ProducerId);
+				}
+				catch(Exception ex)
+				{
+					Tracer.ErrorFormat("Error during producer close: {0}", ex);
+				}
+
+				if(this.usage != null)
+				{
+					this.usage.Stop();
+				}
+
+				closed = true;
+			}
+		}
+
+		public void Send(IMessage message)
+		{
+			Send(info.Destination, message, this.msgDeliveryMode, this.msgPriority, this.msgTimeToLive, false);
+		}
+
+		public void Send(IDestination destination, IMessage message)
+		{
+			Send(destination, message, this.msgDeliveryMode, this.msgPriority, this.msgTimeToLive, false);
+		}
+
+		public void Send(IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
+		{
+			Send(info.Destination, message, deliveryMode, priority, timeToLive, true);
+		}
+
+		public void Send(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
+		{
+			Send(destination, message, deliveryMode, priority, timeToLive, true);
+		}
+
+		protected void Send(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive, bool specifiedTimeToLive)
+		{
+			if(null == destination)
+			{
+				// See if this producer was created without a destination.
+				if(null == info.Destination)
+				{
+					throw new NotSupportedException();
+				}
+
+				// The producer was created with a destination, but an invalid destination
+				// was specified.
+				throw new Apache.NMS.InvalidDestinationException();
+			}
+
+			ActiveMQDestination dest = null;
+
+			if(destination == this.info.Destination)
+			{
+				dest = destination as ActiveMQDestination;
+			}
+			else if(info.Destination == null)
+			{
+				dest = ActiveMQDestination.Transform(destination);
+			}
+			else
+			{
+				throw new NotSupportedException("This producer can only send messages to: " + this.info.Destination.PhysicalName);
+			}
+
 			if(this.ProducerTransformer != null)
 			{
 				IMessage transformed = this.ProducerTransformer(this.session, this, message);
@@ -211,147 +211,147 @@ namespace Apache.NMS.ActiveMQ
 				}
 			}
 
-            ActiveMQMessage activeMessage = this.messageTransformation.TransformMessage<ActiveMQMessage>(message);
+			ActiveMQMessage activeMessage = this.messageTransformation.TransformMessage<ActiveMQMessage>(message);
 
-            activeMessage.ProducerId = info.ProducerId;
-            activeMessage.Destination = dest;
-            activeMessage.NMSDeliveryMode = deliveryMode;
-            activeMessage.NMSPriority = priority;
-
-            // Always set the message Id regardless of the disable flag.
-            MessageId id = new MessageId();
-            id.ProducerId = info.ProducerId;
-            id.ProducerSequenceId = Interlocked.Increment(ref this.producerSequenceId);
-            activeMessage.MessageId = id;
-
-            if(!disableMessageTimestamp)
-            {
-                activeMessage.NMSTimestamp = DateTime.UtcNow;
-            }
-
-            if(specifiedTimeToLive)
-            {
-                activeMessage.NMSTimeToLive = timeToLive;
-            }
-
-            // Ensure there's room left to send this message
-            if(this.usage != null)
-            {
-                usage.WaitForSpace();
-            }
-
-            lock(closedLock)
-            {
-                if(closed)
-                {
-                    throw new ConnectionClosedException();
-                }
-
-                session.DoSend(activeMessage, this, this.usage, this.RequestTimeout);
-            }
-        }
-
-        public ProducerId ProducerId
-        {
-            get { return info.ProducerId; }
-        }
-
-        public ProducerInfo ProducerInfo
-        {
-            get { return info; }
-        }
-
-        public MsgDeliveryMode DeliveryMode
-        {
-            get { return msgDeliveryMode; }
-            set { this.msgDeliveryMode = value; }
-        }
-
-        public TimeSpan TimeToLive
-        {
-            get { return msgTimeToLive; }
-            set { this.msgTimeToLive = value; }
-        }
-
-        public TimeSpan RequestTimeout
-        {
-            get { return requestTimeout; }
-            set { this.requestTimeout = value; }
-        }
-
-        public MsgPriority Priority
-        {
-            get { return msgPriority; }
-            set { this.msgPriority = value; }
-        }
-
-        public bool DisableMessageID
-        {
-            get { return disableMessageID; }
-            set { this.disableMessageID = value; }
-        }
-
-        public bool DisableMessageTimestamp
-        {
-            get { return disableMessageTimestamp; }
-            set { this.disableMessageTimestamp = value; }
-        }
-		
-        private ProducerTransformerDelegate producerTransformer;
-        public ProducerTransformerDelegate ProducerTransformer
-        {
-            get { return this.producerTransformer; }
-            set { this.producerTransformer = value; }
-        }
-
-        public IMessage CreateMessage()
-        {
-            return session.CreateMessage();
-        }
-
-        public ITextMessage CreateTextMessage()
-        {
-            return session.CreateTextMessage();
-        }
-
-        public ITextMessage CreateTextMessage(string text)
-        {
-            return session.CreateTextMessage(text);
-        }
-
-        public IMapMessage CreateMapMessage()
-        {
-            return session.CreateMapMessage();
-        }
-
-        public IObjectMessage CreateObjectMessage(object body)
-        {
-            return session.CreateObjectMessage(body);
-        }
-
-        public IBytesMessage CreateBytesMessage()
-        {
-            return session.CreateBytesMessage();
-        }
-
-        public IBytesMessage CreateBytesMessage(byte[] body)
-        {
-            return session.CreateBytesMessage(body);
-        }
-
-        public IStreamMessage CreateStreamMessage()
-        {
-            return session.CreateStreamMessage();
-        }
-
-        internal void OnProducerAck(ProducerAck ack)
-        {
-            Tracer.Debug("Received ProducerAck for Message of Size = {" + ack.Size + "}" );
-
-            if(this.usage != null)
-            {
-                this.usage.DecreaseUsage( ack.Size );
-            }
-        }
-    }
+			activeMessage.ProducerId = info.ProducerId;
+			activeMessage.Destination = dest;
+			activeMessage.NMSDeliveryMode = deliveryMode;
+			activeMessage.NMSPriority = priority;
+
+			// Always set the message Id regardless of the disable flag.
+			MessageId id = new MessageId();
+			id.ProducerId = info.ProducerId;
+			id.ProducerSequenceId = Interlocked.Increment(ref this.producerSequenceId);
+			activeMessage.MessageId = id;
+
+			if(!disableMessageTimestamp)
+			{
+				activeMessage.NMSTimestamp = DateTime.UtcNow;
+			}
+
+			if(specifiedTimeToLive)
+			{
+				activeMessage.NMSTimeToLive = timeToLive;
+			}
+
+			// Ensure there's room left to send this message
+			if(this.usage != null)
+			{
+				usage.WaitForSpace();
+			}
+
+			lock(closedLock)
+			{
+				if(closed)
+				{
+					throw new ConnectionClosedException();
+				}
+
+				session.DoSend(activeMessage, this, this.usage, this.RequestTimeout);
+			}
+		}
+
+		public ProducerId ProducerId
+		{
+			get { return info.ProducerId; }
+		}
+
+		public ProducerInfo ProducerInfo
+		{
+			get { return info; }
+		}
+
+		public MsgDeliveryMode DeliveryMode
+		{
+			get { return msgDeliveryMode; }
+			set { this.msgDeliveryMode = value; }
+		}
+
+		public TimeSpan TimeToLive
+		{
+			get { return msgTimeToLive; }
+			set { this.msgTimeToLive = value; }
+		}
+
+		public TimeSpan RequestTimeout
+		{
+			get { return requestTimeout; }
+			set { this.requestTimeout = value; }
+		}
+
+		public MsgPriority Priority
+		{
+			get { return msgPriority; }
+			set { this.msgPriority = value; }
+		}
+
+		public bool DisableMessageID
+		{
+			get { return disableMessageID; }
+			set { this.disableMessageID = value; }
+		}
+
+		public bool DisableMessageTimestamp
+		{
+			get { return disableMessageTimestamp; }
+			set { this.disableMessageTimestamp = value; }
+		}
+
+		private ProducerTransformerDelegate producerTransformer;
+		public ProducerTransformerDelegate ProducerTransformer
+		{
+			get { return this.producerTransformer; }
+			set { this.producerTransformer = value; }
+		}
+
+		public IMessage CreateMessage()
+		{
+			return session.CreateMessage();
+		}
+
+		public ITextMessage CreateTextMessage()
+		{
+			return session.CreateTextMessage();
+		}
+
+		public ITextMessage CreateTextMessage(string text)
+		{
+			return session.CreateTextMessage(text);
+		}
+
+		public IMapMessage CreateMapMessage()
+		{
+			return session.CreateMapMessage();
+		}
+
+		public IObjectMessage CreateObjectMessage(object body)
+		{
+			return session.CreateObjectMessage(body);
+		}
+
+		public IBytesMessage CreateBytesMessage()
+		{
+			return session.CreateBytesMessage();
+		}
+
+		public IBytesMessage CreateBytesMessage(byte[] body)
+		{
+			return session.CreateBytesMessage(body);
+		}
+
+		public IStreamMessage CreateStreamMessage()
+		{
+			return session.CreateStreamMessage();
+		}
+
+		internal void OnProducerAck(ProducerAck ack)
+		{
+			Tracer.Debug("Received ProducerAck for Message of Size = {" + ack.Size + "}" );
+
+			if(this.usage != null)
+			{
+				this.usage.DecreaseUsage( ack.Size );
+			}
+		}
+	}
 }



Mime
View raw message