activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r818220 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp: Connection.cs MessageProducer.cs Session.cs
Date Wed, 23 Sep 2009 19:33:05 GMT
Author: tabish
Date: Wed Sep 23 19:33:05 2009
New Revision: 818220

URL: http://svn.apache.org/viewvc?rev=818220&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQNET-189

Add changes for sending message async or sync based on whether messages are sent with the
following rules.

Sent Async when all the following are true.
1. There is no send timeout.
2. Message doesn't have its response required feild set.
3. Connection is not configured for Always send Synchronously.

Plus any of the following is also true

1. Message is Non-Persistent.
2. Connection is configured for Async Sends.
3. Message is part of a Transaction.

Otherwise messages are send Synchronously.  

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=818220&r1=818219&r2=818220&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs Wed Sep
23 19:33:05 2009
@@ -37,12 +37,16 @@
 		private BrokerInfo brokerInfo; // from broker
 		private WireFormatInfo brokerWireFormatInfo; // from broker
 		private readonly IList sessions = ArrayList.Synchronized(new ArrayList());
+        private readonly IDictionary producers = Hashtable.Synchronized(new Hashtable());
 		/// <summary>
 		/// Private object used for synchronization, instead of public "this"
 		/// </summary>
 		private readonly object myLock = new object();
 		private bool asyncSend = false;
+        private bool alwaysSyncSend = false;
 		private bool asyncClose = true;
+        private bool copyMessageOnSend = true;
+        private int producerWindowSize = 0;
 		private bool connected = false;
 		private bool closed = false;
 		private bool closing = false;
@@ -103,7 +107,46 @@
 		{
 			set { this.acknowledgementMode = NMSConvert.ToAcknowledgementMode(value); }
 		}
-
+        
+        /// <summary>
+        /// This property is the maximum number of bytes in memory that a producer will transmit

+        /// to a broker before waiting for acknowledgement messages from the broker that
it has 
+        /// accepted the previously sent messages. In other words, this how you configure
the 
+        /// producer flow control window that is used for async sends where the client is
responsible 
+        /// for managing memory usage. The default value of 0 means no flow control at the
client
+        /// </summary>
+        public int ProducerWindowSize
+        {
+            get { return producerWindowSize; }
+            set { producerWindowSize = value; }
+        }
+        
+        /// <summary>
+        /// This property forces all messages that are sent to be sent synchronously overriding
+        /// any usage of the AsyncSend flag. This can reduce performance in some cases since
the 
+        /// only messages we normally send synchronously are Persistent messages not sent
in a 
+        /// transaction. This options guarantees that no send will return until the broker
has 
+        /// acknowledge receipt of the message
+        /// </summary>
+        public bool AlwaysSyncSend
+        {
+            get { return alwaysSyncSend; }
+            set { alwaysSyncSend = value; }
+        }
+
+        /// <summary>
+        /// This property indicates whether Message's should be copied before being sent
via
+        /// one of the Connection's send methods.  Copying the Message object allows the
user
+        /// to resuse the Object over for another send.  If the message isn't copied performance
+        /// can improve but the user must not reuse the Object as it may not have been sent
+        /// before they reset its payload.
+        /// </summary>
+        public bool CopyMessageOnSend
+        {
+            get { return copyMessageOnSend; }
+            set { copyMessageOnSend = value; }
+        }
+        
 		#endregion
 
 		/// <summary>
@@ -192,6 +235,16 @@
 				sessions.Remove(session);
 			}
 		}
+        
+        public void addProducer( ProducerId id, MessageProducer producer ) 
+        {
+            this.producers.Add( id, producer );
+        }
+        
+        public void removeProducer( ProducerId id )
+        {
+            this.producers.Remove( id );
+        }   
 
 		public void Close()
 		{
@@ -455,6 +508,16 @@
 					OnException(commandTransport, new NMSException("Broker closed this connection."));
 				}
 			}
+			else if(command is ProducerAck)
+			{
+                ProducerAck ack = (ProducerAck) command;
+                if(ack != null && ack.ProducerId != null) {
+                    MessageProducer producer = (MessageProducer) producers[ack.ProducerId];
+                    if( producer != null ) {
+                        producer.OnProducerAck(ack);
+                    }
+                }
+			}
 			else if(command is ConnectionError)
 			{
 				if(!closing && !closed)

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs?rev=818220&r1=818219&r2=818220&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
Wed Sep 23 19:33:05 2009
@@ -17,6 +17,7 @@
 
 using System;
 using System.Threading;
+using Apache.NMS.Util;
 using Apache.NMS.ActiveMQ.Commands;
 
 namespace Apache.NMS.ActiveMQ
@@ -27,10 +28,11 @@
 	public class MessageProducer : IMessageProducer
 	{
 		private Session session;
+        private MemoryUsage usage = null;
 		private bool closed = false;
         private object closedLock = new object();
 		private readonly ProducerInfo info;
-		private int messageCounter = 0;
+        private long producerSequenceId = 0;
 
 		private MsgDeliveryMode msgDeliveryMode = NMSConstants.defaultDeliveryMode;
 		private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout;
@@ -100,6 +102,11 @@
 					Tracer.ErrorFormat("Error during producer close: {0}", ex);
 				}
 
+                if(this.usage != null)
+                {
+                    this.usage.Stop();
+                }
+                
 				session = null;
 				closed = true;
 			}
@@ -142,19 +149,17 @@
 
 			ActiveMQMessage activeMessage = (ActiveMQMessage) message;
 
-			if(!disableMessageID)
-			{
-				MessageId id = new MessageId();
-				id.ProducerId = info.ProducerId;
-				id.ProducerSequenceId = Interlocked.Increment(ref messageCounter);
-				activeMessage.MessageId = id;
-			}
-
 			activeMessage.ProducerId = info.ProducerId;
 			activeMessage.FromDestination = destination;
 			activeMessage.NMSDeliveryMode = deliveryMode;
 			activeMessage.NMSPriority = priority;
 
+            // Always set the message Id regardless of the disable flag.
+            MessageId id = new MessageId();
+            id.ProducerId = info.ProducerId;
+            id.ProducerSequenceId = Interlocked.Increment(ref this.producerSequenceId);
+            activeMessage.MessageId = id;
+            
 			if(!disableMessageTimestamp)
 			{
 				activeMessage.NMSTimestamp = DateTime.UtcNow;
@@ -165,6 +170,12 @@
 				activeMessage.NMSTimeToLive = timeToLive;
 			}
 
+            // Ensure there's room left to send this message            
+            if(this.usage != null)
+            {
+                usage.WaitForSpace();
+            }
+            
 			lock(closedLock)
 			{
 				if(closed)
@@ -172,16 +183,15 @@
 					throw new ConnectionClosedException();
 				}
 
-				if(session.Transacted)
-				{
-					session.DoStartTransaction();
-					activeMessage.TransactionId = session.TransactionContext.TransactionId;
-				}
-
-				session.DoSend(activeMessage, this.RequestTimeout);
+				session.DoSend(activeMessage, this, this.usage, this.RequestTimeout);
 			}
 		}
 
+        public ProducerId ProducerId
+        {
+            get { return info.ProducerId; }
+        }
+        
 		public MsgDeliveryMode DeliveryMode
 		{
 			get { return msgDeliveryMode; }
@@ -252,5 +262,13 @@
 		{
 			return session.CreateBytesMessage(body);
 		}
+        
+        public void OnProducerAck(ProducerAck ack)
+        {
+            if(this.usage != null)
+            {
+                this.usage.DecreaseUsage( ack.Size );
+            }
+        }
 	}
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs?rev=818220&r1=818219&r2=818220&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs Wed Sep
23 19:33:05 2009
@@ -44,7 +44,7 @@
         private bool closed = false;
         private bool closing = false;
         private TimeSpan MAX_THREAD_WAIT = TimeSpan.FromMilliseconds(30000);
-
+        
         public Session(Connection connection, SessionInfo info, AcknowledgementMode acknowledgementMode)
         {
             this.connection = connection;
@@ -470,12 +470,12 @@
             this.DoSend(command);
         }
 
-        public void DoSend(Command message)
+        private void DoSend(Command message)
         {
             this.DoSend(message, this.RequestTimeout);
         }
 
-        public void DoSend(Command message, TimeSpan requestTimeout)
+        private void DoSend(Command message, TimeSpan requestTimeout)
         {
             if(AsyncSend)
             {
@@ -488,6 +488,54 @@
             }
         }
 
+        public void DoSend( ActiveMQMessage message, MessageProducer producer, MemoryUsage
producerWindow, TimeSpan sendTimeout )
+        {
+            ActiveMQMessage msg = message;
+            
+            if(Transacted)
+            {
+                DoStartTransaction();
+                msg.TransactionId = TransactionContext.TransactionId;
+            }
+                        
+            msg.RedeliveryCounter = 0;
+            msg.BrokerPath = null;
+
+            if(this.connection.CopyMessageOnSend)
+            {
+                msg = (ActiveMQMessage)msg.Clone();
+            }
+            
+            msg.OnSend();
+            msg.ProducerId = msg.MessageId.ProducerId;
+
+            if(sendTimeout.TotalMilliseconds <= 0 && !msg.ResponseRequired &&
!connection.AlwaysSyncSend && 
+               (!msg.Persistent || connection.AsyncSend || msg.TransactionId != null))
+            {
+                this.connection.Oneway(msg);
+                
+                if(producerWindow != null) 
+                {
+                    // Since we defer lots of the marshaling till we hit the wire, this 
+                    // might not provide and accurate size. We may change over to doing
+                    // more aggressive marshaling, to get more accurate sizes.. this is more

+                    // important once users start using producer window flow control.
+                    producerWindow.IncreaseUsage(msg.Size());
+                }
+            } 
+            else 
+            {
+                if(sendTimeout.TotalMilliseconds > 0)
+                {
+                    this.connection.SyncRequest(msg, sendTimeout);
+                }
+                else
+                {
+                    this.connection.SyncRequest(msg);
+                }
+            }            
+        }
+
         /// <summary>
         /// Ensures that a transaction is started
         /// </summary>
@@ -563,7 +611,7 @@
             answer.Exclusive = this.Exclusive;
             answer.DispatchAsync = this.DispatchAsync;
             answer.Retroactive = this.Retroactive;
-			answer.MaximumPendingMessageLimit = this.MaximumPendingMessageLimit;
+            answer.MaximumPendingMessageLimit = this.MaximumPendingMessageLimit;
 
             // If the destination contained a URI query, then use it to set public properties
             // on the ConsumerInfo
@@ -585,6 +633,7 @@
             id.Value = Interlocked.Increment(ref producerCounter);
             answer.ProducerId = id;
             answer.Destination = ActiveMQDestination.Transform(destination);
+            answer.WindowSize = connection.ProducerWindowSize;
 
             // If the destination contained a URI query, then use it to set public
             // properties on the ProducerInfo



Mime
View raw message