activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r891876 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp: Connection.cs MessageConsumer.cs MessageProducer.cs Session.cs TransactionContext.cs
Date Thu, 17 Dec 2009 20:34:10 GMT
Author: tabish
Date: Thu Dec 17 20:34:10 2009
New Revision: 891876

URL: http://svn.apache.org/viewvc?rev=891876&view=rev
Log:
http://issues.apache.org/activemq/browse/AMQNET-220

Attempting to fix redelivery issues.  Found some things that the client was not doing correctly
when sending various messages.  This submission adds the option the set a session to send
acks async or sync, the session will also look at the type of session ack mode and for Transacted
sessions always send the acks sync.  Added the DispatchAsync option to Connection to allow
the connection to specify if consumers should configure themselves with the broker as asyncDispatch
or not, default is true,  Also cleaned up some logging that duplicates what can be achieved
by using the transport.UseLogging=true option.

Still have issue with message getting tagged on the broker as redelived when the actually
aren't

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/Connection.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/MessageProducer.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/Session.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/TransactionContext.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/Connection.cs?rev=891876&r1=891875&r2=891876&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/Connection.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/Connection.cs
Thu Dec 17 20:34:10 2009
@@ -46,6 +46,8 @@
         private bool asyncClose = true;
         private bool useCompression = false;
         private bool copyMessageOnSend = true;
+        private bool sendAcksAsync = false;
+        private bool dispatchAsync = true;
         private int producerWindowSize = 0;
         private bool connected = false;
         private bool closed = false;
@@ -136,6 +138,17 @@
         }
 
         /// <summary>
+        /// This property indicates whether or not async sends are used for
+        /// message acknowledgement messages.  Sending Acks async can improve
+        /// performance but may decrease reliability.
+        /// </summary>
+        public bool SendAcksAsync
+        {
+            get { return sendAcksAsync; }
+            set { sendAcksAsync = value; }
+        }
+
+        /// <summary>
         /// This property sets the acknowledgment mode for the connection.
         /// The URI parameter connection.ackmode can be set to a string value
         /// that maps to the enumeration value.
@@ -225,6 +238,15 @@
             set { this.acknowledgementMode = value; }
         }
 
+        /// <summary>
+        /// synchronously or asynchronously by the broker.
+        /// </summary>
+        public bool DispatchAsync
+        {
+            get { return this.dispatchAsync; }
+            set { this.dispatchAsync = value; }
+        }
+
         public string ClientId
         {
             get { return info.ClientId; }
@@ -338,7 +360,7 @@
         {
             SessionInfo info = CreateSessionInfo(sessionAcknowledgementMode);
             SyncRequest(info, this.RequestTimeout);
-            Session session = new Session(this, info, sessionAcknowledgementMode);
+            Session session = new Session(this, info, sessionAcknowledgementMode, this.dispatchAsync);
 
             // Set properties on session using parameters prefixed with "session."
             URISupport.CompositeData c = URISupport.parseComposite(this.brokerUri);

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/MessageConsumer.cs?rev=891876&r1=891875&r2=891876&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/MessageConsumer.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/MessageConsumer.cs
Thu Dec 17 20:34:10 2009
@@ -279,7 +279,7 @@
 
 				if(!this.session.IsTransacted)
 				{
-					lock(this.dispatchedMessages)
+                    lock(this.dispatchedMessages)
 					{
 						dispatchedMessages.Clear();
 					}
@@ -288,7 +288,7 @@
 				this.unconsumedMessages.Close();
 				this.session.DisposeOf(this.info.ConsumerId, this.lastDeliveredSequenceId);
 
-				RemoveInfo removeCommand = new RemoveInfo();
+                RemoveInfo removeCommand = new RemoveInfo();
 				removeCommand.ObjectId = this.info.ConsumerId;
 				removeCommand.LastDeliveredSequenceId = this.lastDeliveredSequenceId;
 
@@ -352,7 +352,7 @@
 			ack.MessageCount = 1;
 
 			Tracer.Debug("Sending Individual Ack for MessageId: " + ack.LastMessageId.ToString());
-			this.session.Connection.Oneway(ack);
+			this.session.SendAck(ack);
 		}
 
 		protected void DoNothingAcknowledge(ActiveMQMessage message)
@@ -430,7 +430,7 @@
 
 					try
 					{
-						this.session.Connection.Oneway(ackToSend);
+						this.session.SendAck(ackToSend);
 					}
 					catch(Exception e)
 					{
@@ -685,7 +685,7 @@
 								if(ack != null)
 								{
 									this.dispatchedMessages.Clear();
-									this.session.Connection.Oneway(ack);
+									this.session.SendAck(ack);
 								}
 							}
 						}
@@ -828,7 +828,7 @@
 					ack.TransactionId = this.session.TransactionContext.TransactionId;
 				}
 
-				this.session.Connection.Oneway(ack);
+				this.session.SendAck(ack);
 				this.pendingAck = null;
 
 				// Adjust the counters
@@ -890,7 +890,7 @@
 						ack.MessageCount = this.dispatchedMessages.Count;
 						ack.FirstMessageId = firstMsgId;
 
-						this.session.Connection.Oneway(ack);
+						this.session.SendAck(ack);
 
 						// Adjust the window size.
 						additionalWindowSize = Math.Max(0, this.additionalWindowSize - this.dispatchedMessages.Count);
@@ -911,7 +911,7 @@
 							ack.MessageCount = this.dispatchedMessages.Count;
 							ack.FirstMessageId = firstMsgId;
 
-							this.session.Connection.Oneway(ack);
+							this.session.SendAck(ack);
 						}
 
 						// stop the delivery of messages.

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/MessageProducer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/MessageProducer.cs?rev=891876&r1=891875&r2=891876&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/MessageProducer.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/MessageProducer.cs
Thu Dec 17 20:34:10 2009
@@ -38,7 +38,7 @@
         private MsgDeliveryMode msgDeliveryMode = NMSConstants.defaultDeliveryMode;
         private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout;
         private TimeSpan msgTimeToLive = NMSConstants.defaultTimeToLive;
-        private MsgPriority msgPriority = NMSConstants.defaultPriority;
+        private MsgPriority msgPriority = NMSConstants.defaultPriority - 1;
         private bool disableMessageID = false;
         private bool disableMessageTimestamp = false;
         protected bool disposed = false;
@@ -133,7 +133,7 @@
                 }
 
                 closed = true;
-            }            
+            }
         }
 
         public void Send(IMessage message)
@@ -302,12 +302,12 @@
             return session.CreateBytesMessage(body);
         }
 
-		public IStreamMessage CreateStreamMessage()
-		{
-			return session.CreateStreamMessage();
-		}
+        public IStreamMessage CreateStreamMessage()
+        {
+            return session.CreateStreamMessage();
+        }
 
-		public void OnProducerAck(ProducerAck ack)
+        public void OnProducerAck(ProducerAck ack)
         {
             Tracer.Debug("Received ProducerAck for Message of Size = {" + ack.Size + "}"
);
 

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/Session.cs?rev=891876&r1=891875&r2=891876&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/Session.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/Session.cs
Thu Dec 17 20:34:10 2009
@@ -33,10 +33,10 @@
         /// Private object used for synchronization, instead of public "this"
         /// </summary>
         private readonly object myLock = new object();
-        
+
         private readonly IDictionary consumers = Hashtable.Synchronized(new Hashtable());
         private readonly IDictionary producers = Hashtable.Synchronized(new Hashtable());
-        
+
         private SessionExecutor executor;
         private TransactionContext transactionContext;
         private Connection connection;
@@ -44,8 +44,8 @@
         private bool dispatchAsync;
         private bool exclusive;
         private bool retroactive;
-        private byte priority;
-        
+        private byte priority = 4;
+
         private readonly SessionInfo info;
         private int consumerCounter;
         private int producerCounter;
@@ -57,12 +57,13 @@
         private TimeSpan requestTimeout = Apache.NMS.NMSConstants.defaultRequestTimeout;
         private AcknowledgementMode acknowledgementMode;
 
-        public Session(Connection connection, SessionInfo info, AcknowledgementMode acknowledgementMode)
+        public Session(Connection connection, SessionInfo info, AcknowledgementMode acknowledgementMode,
bool dispatchAsync)
         {
             this.connection = connection;
             this.info = info;
             this.acknowledgementMode = acknowledgementMode;
             this.requestTimeout = connection.RequestTimeout;
+            this.dispatchAsync = dispatchAsync;
 
             if(acknowledgementMode == AcknowledgementMode.Transactional)
             {
@@ -201,9 +202,9 @@
         {
             get { return Interlocked.Increment(ref this.nextDeliveryId); }
         }
-        
+
         #endregion
-        
+
         #region ISession Members
 
         public void Dispose()
@@ -284,7 +285,7 @@
 
                     // Stop all message deliveries from this Session
                     Stop();
-                    
+
                     lock(consumers.SyncRoot)
                     {
                         foreach(MessageConsumer consumer in consumers.Values)
@@ -315,8 +316,8 @@
                         catch
                         {
                         }
-                    }                    
-                    
+                    }
+
                     Connection.RemoveSession(this);
                 }
                 catch(Exception ex)
@@ -328,9 +329,9 @@
                     this.closed = true;
                     this.closing = false;
                 }
-            }            
+            }
         }
-        
+
         public IMessageProducer CreateProducer()
         {
             return CreateProducer(null);
@@ -400,7 +401,7 @@
                 {
                     consumer.Start();
                 }
-                
+
                 return consumer;
             }
             catch(Exception)
@@ -430,7 +431,7 @@
 
             // Registered with Connection before we register at the broker.
             connection.addDispatcher(consumerId, this);
-            
+
             try
             {
                 consumer = new MessageConsumer(this, command);
@@ -441,7 +442,7 @@
                 {
                     consumer.Start();
                 }
-                
+
                 this.connection.SyncRequest(command);
             }
             catch(Exception)
@@ -475,7 +476,7 @@
         {
             throw new NotSupportedException("Not Yet Implemented");
         }
-        
+
         public IQueue GetQueue(string name)
         {
             return new ActiveMQQueue(name);
@@ -548,12 +549,12 @@
             return ConfigureMessage(answer) as IBytesMessage;
         }
 
-		public IStreamMessage CreateStreamMessage()
-		{
-			return ConfigureMessage(new ActiveMQStreamMessage()) as IStreamMessage;
-		}
-		
-		public IObjectMessage CreateObjectMessage(object body)
+        public IStreamMessage CreateStreamMessage()
+        {
+            return ConfigureMessage(new ActiveMQStreamMessage()) as IStreamMessage;
+        }
+
+        public IObjectMessage CreateObjectMessage(object body)
         {
             ActiveMQObjectMessage answer = new ActiveMQObjectMessage();
             answer.Body = body;
@@ -568,7 +569,7 @@
                         "You cannot perform a Commit() on a non-transacted session. Acknowlegement
mode is: "
                         + this.AcknowledgementMode);
             }
-            
+
             this.TransactionContext.Commit();
         }
 
@@ -580,7 +581,7 @@
                         "You cannot perform a Commit() on a non-transacted session. Acknowlegement
mode is: "
                         + this.AcknowledgementMode);
             }
-            
+
             this.TransactionContext.Rollback();
         }
 
@@ -659,7 +660,7 @@
         {
             connection.removeDispatcher(objectId);
             this.lastDeliveredSequenceId = Math.Min(this.lastDeliveredSequenceId, lastDeliveredSequenceId);
-            
+
             if(!this.closing)
             {
                 consumers.Remove(objectId);
@@ -699,7 +700,7 @@
             {
                 answer.PrefetchSize = this.connection.PrefetchPolicy.QueuePrefetch;
             }
-            
+
             // If the destination contained a URI query, then use it to set public properties
             // on the ConsumerInfo
             ActiveMQDestination amqDestination = destination as ActiveMQDestination;
@@ -747,11 +748,11 @@
             {
                 consumer.Start();
             }
-            
+
             if(this.executor != null)
             {
                 this.executor.Start();
-            }            
+            }
         }
 
         public bool Started
@@ -769,32 +770,24 @@
 
             foreach(MessageDispatch message in messages)
             {
-                if(Tracer.IsDebugEnabled)
-                {
-                    Tracer.DebugFormat("Resending Message Dispatch: ", message.ToString());
-                }
                 this.executor.ExecuteFirst(message);
             }
         }
-        
+
         public void Dispatch(MessageDispatch dispatch)
         {
             if(this.executor != null)
             {
-                if(Tracer.IsDebugEnabled)
-                {
-                    Tracer.DebugFormat("Send Message Dispatch: ", dispatch.ToString());
-                }
                 this.executor.Execute(dispatch);
             }
         }
 
-        internal void ClearMessagesInProgress() 
-        {        
+        internal void ClearMessagesInProgress()
+        {
             if( this.executor != null ) {
                 this.executor.ClearMessagesInProgress();
             }
-        
+
             lock(this.consumers.SyncRoot)
             {
                 foreach(MessageConsumer consumer in this.consumers)
@@ -811,7 +804,7 @@
                 foreach(MessageConsumer consumer in this.consumers.Values)
                 {
                     consumer.Acknowledge();
-                }                
+                }
             }
         }
 
@@ -828,6 +821,23 @@
             return message;
         }
 
+        internal void SendAck(MessageAck ack)
+        {
+            this.SendAck(ack, false);
+        }
+
+        internal void SendAck(MessageAck ack, bool lazy)
+        {
+            if(lazy || connection.SendAcksAsync || this.IsTransacted )
+            {
+                this.connection.Oneway(ack);
+            }
+            else
+            {
+                this.connection.SyncRequest(ack);
+            }
+        }
+
         /// <summary>
         /// Prevents message from throwing an exception if a client calls Acknoweldge on
         /// a message that is part of a transaction either being produced or consumed.  The
@@ -840,6 +850,6 @@
         private void DoNothingAcknowledge(ActiveMQMessage message)
         {
         }
-        
+
     }
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/TransactionContext.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/TransactionContext.cs?rev=891876&r1=891875&r2=891876&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/TransactionContext.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/TransactionContext.cs
Thu Dec 17 20:34:10 2009
@@ -75,6 +75,11 @@
                 info.Type = (int) TransactionType.Begin;
                 
                 this.session.Connection.Oneway(info);
+
+                if(Tracer.IsDebugEnabled)
+                {
+                    Tracer.Debug("Begin:" + this.transactionId.ToString());
+                }
             }
         }
         
@@ -87,6 +92,13 @@
 
             this.BeforeEnd();
 
+            if(Tracer.IsDebugEnabled)
+            {
+                Tracer.Debug("Rollback: "  + this.transactionId +
+                             " syncCount: " +
+                             (synchronizations != null ? synchronizations.Count : 0));
+            }
+
             TransactionInfo info = new TransactionInfo();
             info.ConnectionId = this.session.Connection.ConnectionId;
             info.TransactionId = transactionId;
@@ -107,7 +119,14 @@
             }
 
             this.BeforeEnd();
-            
+
+            if(Tracer.IsDebugEnabled)
+            {
+                Tracer.Debug("Commit: "  + this.transactionId +
+                             " syncCount: " +
+                             (synchronizations != null ? synchronizations.Count : 0));
+            }
+
             TransactionInfo info = new TransactionInfo();
             info.ConnectionId = this.session.Connection.ConnectionId;
             info.TransactionId = transactionId;



Mime
View raw message