activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r961505 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp: Connection.cs MessageConsumer.cs Session.cs State/ConnectionState.cs State/ConnectionStateTracker.cs Transport/Failover/FailoverTransport.cs
Date Wed, 07 Jul 2010 20:50:18 GMT
Author: tabish
Date: Wed Jul  7 20:50:17 2010
New Revision: 961505

URL: http://svn.apache.org/viewvc?rev=961505&view=rev
Log:
Update the State Tracker and Failover Transport to allow the Consumers to control when their
processing of connection interruption is complete.  Perform the Consumer clear operation Asynchronously
to avoid lock contention.  

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionState.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=961505&r1=961504&r2=961505&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs Wed Jul
 7 20:50:17 2010
@@ -20,6 +20,7 @@ using System.Collections;
 using System.Threading;
 using Apache.NMS.ActiveMQ.Commands;
 using Apache.NMS.ActiveMQ.Transport;
+using Apache.NMS.ActiveMQ.Transport.Failover;
 using Apache.NMS.ActiveMQ.Util;
 using Apache.NMS.Util;
 
@@ -67,6 +68,7 @@ namespace Apache.NMS.ActiveMQ
         private PrefetchPolicy prefetchPolicy = new PrefetchPolicy();
         private ICompressionPolicy compressionPolicy = new CompressionPolicy();
         private IdGenerator clientIdGenerator;
+        private volatile CountDownLatch transportInterruptionProcessingComplete;
 
         public Connection(Uri connectionUri, ITransport transport, IdGenerator clientIdGenerator)
         {
@@ -641,6 +643,7 @@ namespace Apache.NMS.ActiveMQ
         {
             if(command is MessageDispatch)
             {
+                WaitForTransportInterruptionProcessingToComplete();
                 DispatchMessage((MessageDispatch) command);
             }
             else if(command is KeepAliveInfo)
@@ -767,6 +770,12 @@ namespace Apache.NMS.ActiveMQ
         {
             Tracer.Debug("Connection: Transport has been Interrupted.");
 
+            this.transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.Count);
+            if(Tracer.IsDebugEnabled)
+            {
+                Tracer.Debug("transport interrupted, dispatchers: " + dispatchers.Count);
+            }
+
             foreach(Session session in this.sessions)
             {
                 try
@@ -890,5 +899,62 @@ namespace Apache.NMS.ActiveMQ
             this.Oneway(command);
         }
 
+        private void WaitForTransportInterruptionProcessingToComplete()
+        {
+            CountDownLatch cdl = this.transportInterruptionProcessingComplete;
+            if(cdl != null)
+            {
+                if(!closed && cdl.Remaining > 0)
+                {
+                    Tracer.Warn("dispatch paused, waiting for outstanding dispatch interruption
" +
+                                "processing (" + cdl.Remaining + ") to complete..");
+                    cdl.await(TimeSpan.FromSeconds(10));
+                }
+
+                SignalInterruptionProcessingComplete();
+            }
+        }
+
+        internal void TransportInterruptionProcessingComplete()
+        {
+            CountDownLatch cdl = this.transportInterruptionProcessingComplete;
+            if(cdl != null)
+            {
+                cdl.countDown();
+                try
+                {
+                    SignalInterruptionProcessingComplete();
+                }
+                catch
+                {
+                }
+            }
+        }
+    
+        private void SignalInterruptionProcessingComplete()
+        {
+            CountDownLatch cdl = this.transportInterruptionProcessingComplete;
+            if(cdl.Remaining == 0)
+            {
+                if(Tracer.IsDebugEnabled)
+                {
+                    Tracer.Debug("transportInterruptionProcessingComplete for: " + this.info.ConnectionId);
+                }
+                this.transportInterruptionProcessingComplete = null;
+
+                FailoverTransport failoverTransport = transport.Narrow(typeof(FailoverTransport))
as FailoverTransport;
+                if(failoverTransport != null)
+                {
+                    failoverTransport.ConnectionInterruptProcessingComplete(this.info.ConnectionId);
+                    if(Tracer.IsDebugEnabled)
+                    {
+                        Tracer.Debug("notified failover transport (" + failoverTransport
+
+                                     ") of interruption completion for: " + this.info.ConnectionId);
+                    }
+                }
+    
+            }
+        }
+
     }
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=961505&r1=961504&r2=961505&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
Wed Jul  7 20:50:17 2010
@@ -57,6 +57,7 @@ namespace Apache.NMS.ActiveMQ
 		private int dispatchedCount = 0;
 		private volatile bool synchronizationRegistered = false;
 		private bool clearDispatchList = false;
+        private bool inProgressClearRequiredFlag;
 
 		private const int DEFAULT_REDELIVERY_DELAY = 0;
 		private const int DEFAULT_MAX_REDELIVERIES = 5;
@@ -421,17 +422,38 @@ namespace Apache.NMS.ActiveMQ
 			this.unconsumedMessages.Stop();
 		}
 
-		public void ClearMessagesInProgress()
-		{
-			// we are called from inside the transport reconnection logic
-			// which involves us clearing all the connections' consumers
-			// dispatch lists and clearing them
-			// so rather than trying to grab a mutex (which could be already
-			// owned by the message listener calling the send) we will just set
-			// a flag so that the list can be cleared as soon as the
-			// dispatch thread is ready to flush the dispatch list
-			this.clearDispatchList = true;
-		}
+        internal void InProgressClearRequired()
+        {
+            inProgressClearRequiredFlag = true;
+            // deal with delivered messages async to avoid lock contention with in progress
acks
+            clearDispatchList = true;
+        }
+
+        internal void ClearMessagesInProgress()
+        {
+            if(inProgressClearRequiredFlag)
+            {
+                // Called from a thread in the ThreadPool, so we wait until we can
+                // get a lock on the unconsumed list then we clear it.
+                lock(this.unconsumedMessages)
+                {
+                    if(inProgressClearRequiredFlag)
+                    {
+                        if(Tracer.IsDebugEnabled)
+                        {
+                            Tracer.Debug(this.ConsumerId + " clearing dispatched list ("
+
+                                         this.unconsumedMessages.Count + ") on transport
interrupt");
+                        }
+
+                        this.unconsumedMessages.Clear();
+
+                        // allow dispatch on this connection to resume
+                        this.session.Connection.TransportInterruptionProcessingComplete();
+                        this.inProgressClearRequiredFlag = false;
+                    }
+                }
+            }
+        }
 
 		public void DeliverAcks()
 		{

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs?rev=961505&r1=961504&r2=961505&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs Wed Jul
 7 20:50:17 2010
@@ -793,15 +793,32 @@ namespace Apache.NMS.ActiveMQ
                 this.executor.ClearMessagesInProgress();
             }
 
+            // Because we are called from inside the Transport Reconnection logic
+            // we spawn the Consumer clear to another Thread so that we can avoid
+            // any lock contention that might exist between the consumer and the
+            // connection that is reconnecting.
             lock(this.consumers.SyncRoot)
             {
                 foreach(MessageConsumer consumer in this.consumers.Values)
                 {
-                    consumer.ClearMessagesInProgress();
+                    consumer.InProgressClearRequired();
+                    ThreadPool.QueueUserWorkItem(ClearMessages, consumer);
                 }
             }
         }
 
+        private void ClearMessages(object value)
+        {
+            MessageConsumer consumer = value as MessageConsumer;
+
+            if(Tracer.IsDebugEnabled)
+            {
+                Tracer.Debug("Performing Async Clear of In Progress Messages on Consumer:
" + consumer.ConsumerId);
+            }
+
+            consumer.ClearMessagesInProgress();
+        }
+
         internal void Acknowledge()
         {
             lock(this.consumers.SyncRoot)

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionState.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionState.cs?rev=961505&r1=961504&r2=961505&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionState.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionState.cs
Wed Jul  7 20:50:17 2010
@@ -16,6 +16,7 @@
  */
 
 using System;
+using System.Collections.Generic;
 using Apache.NMS.ActiveMQ.Commands;
 using Apache.NMS.Util;
 
@@ -29,6 +30,9 @@ namespace Apache.NMS.ActiveMQ.State
 		private AtomicDictionary<SessionId, SessionState> sessions = new AtomicDictionary<SessionId,
SessionState>();
 		private AtomicCollection<DestinationInfo> tempDestinations = new AtomicCollection<DestinationInfo>();
 		private Atomic<bool> _shutdown = new Atomic<bool>(false);
+	    private bool connectionInterruptProcessingComplete = true;
+		private Dictionary<ConsumerId, ConsumerInfo> recoveringPullConsumers = 
+			new Dictionary<ConsumerId, ConsumerInfo>();
 
 		public ConnectionState(ConnectionInfo info)
 		{
@@ -75,21 +79,6 @@ namespace Apache.NMS.ActiveMQ.State
 			transactions.Add(id, new TransactionState(id));
 		}
 
-		/*
-		public TransactionState getTransactionState(TransactionId id) {
-			return transactions[id];
-		}
-
-		public SynchronizedCollection<TransactionState> getTransactionStates() {
-			return transactions.Values;
-		}
-
-		public SessionState getSessionState(SessionId id) {
-			return sessions[id];
-		}
-
-		*/
-
 		public TransactionState this[TransactionId id]
 		{
 			get
@@ -192,6 +181,17 @@ namespace Apache.NMS.ActiveMQ.State
 				throw new ApplicationException("Disposed");
 			}
 		}
+		
+		public Dictionary<ConsumerId, ConsumerInfo> RecoveringPullConsumers
+		{
+			get { return this.recoveringPullConsumers; }
+		}
+		
+		public bool ConnectionInterruptProcessingComplete
+		{
+			get { return this.connectionInterruptProcessingComplete; }
+			set { this.connectionInterruptProcessingComplete = value; }
+		}
 
 		public void shutdown()
 		{

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs?rev=961505&r1=961504&r2=961505&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs
Wed Jul  7 20:50:17 2010
@@ -18,6 +18,7 @@
 using System;
 using System.Collections.Generic;
 
+using Apache.NMS.Util;
 using Apache.NMS.ActiveMQ.Commands;
 using Apache.NMS.ActiveMQ.Transport;
 
@@ -175,10 +176,37 @@ namespace Apache.NMS.ActiveMQ.State
 		/// <param name="sessionState"></param>
 		protected void DoRestoreConsumers(ITransport transport, SessionState sessionState)
 		{
+            // Restore the session's consumers but possibly in pull only (prefetch 0 state)
till
+            // recovery completes.
+
+	        ConnectionState connectionState = connectionStates[sessionState.Info.SessionId.ParentId];
+			bool connectionInterruptionProcessingComplete =
+                connectionState.ConnectionInterruptProcessingComplete;
+
 			// Restore the session's consumers
 			foreach(ConsumerState consumerState in sessionState.ConsumerStates)
 			{
-				transport.Oneway(consumerState.Info);
+                ConsumerInfo infoToSend = consumerState.Info;
+
+                if(!connectionInterruptionProcessingComplete && infoToSend.PrefetchSize
> 0)
+                {
+                    infoToSend = consumerState.Info.Clone() as ConsumerInfo;
+                    connectionState.RecoveringPullConsumers.Add(infoToSend.ConsumerId, consumerState.Info);
+                    infoToSend.PrefetchSize = 0;
+                    if(Tracer.IsDebugEnabled)
+                    {
+                        Tracer.Debug("restore consumer: " + infoToSend.ConsumerId +
+                                     " in pull mode pending recovery, overriding prefetch:
" +
+                                     consumerState.Info.PrefetchSize);
+                    }
+                }
+
+                if(Tracer.IsDebugEnabled)
+                {
+                    Tracer.Debug("restore consumer: " + infoToSend.ConsumerId);
+                }
+
+                transport.Oneway(infoToSend);
 			}
 		}
 
@@ -189,7 +217,6 @@ namespace Apache.NMS.ActiveMQ.State
 		protected void DoRestoreProducers(ITransport transport, SessionState sessionState)
 		{
 			// Restore the session's producers
-
 			foreach(ProducerState producerState in sessionState.ProducerStates)
 			{
 				transport.Oneway(producerState.Info);
@@ -652,5 +679,49 @@ namespace Apache.NMS.ActiveMQ.State
 				_maxCacheSize = value;
 			}
 		}
+
+	    public void ConnectionInterruptProcessingComplete(ITransport transport, ConnectionId
connectionId) 
+		{
+	        ConnectionState connectionState = connectionStates[connectionId];
+	        if(connectionState != null) 
+			{
+	            connectionState.ConnectionInterruptProcessingComplete = true;
+	            
+				Dictionary<ConsumerId, ConsumerInfo> stalledConsumers = connectionState.RecoveringPullConsumers;
+				foreach(KeyValuePair<ConsumerId, ConsumerInfo> entry in stalledConsumers)
+				{
+	                ConsumerControl control = new ConsumerControl();
+	                control.ConsumerId = entry.Key;
+	                control.Prefetch = entry.Value.PrefetchSize;
+	                control.Destination = entry.Value.Destination;
+	                try 
+					{
+	                    if(Tracer.IsDebugEnabled) 
+						{
+	                        Tracer.Debug("restored recovering consumer: " + control.ConsumerId
+ 
+							             " with: " + control.Prefetch);
+	                    }
+	                    transport.Oneway(control);
+	                } 
+					catch(Exception ex)
+					{
+	                    if(Tracer.IsDebugEnabled) 
+						{
+	                        Tracer.Debug("Failed to submit control for consumer: " + control.ConsumerId
+
+	                                     " with: " + control.Prefetch + "Error: " + ex.Message);
+	                    }
+	                }
+	            }
+	            stalledConsumers.Clear();
+	        }
+	    }
+		
+	    public void TransportInterrupted() 
+		{
+	        foreach(ConnectionState connectionState in connectionStates.Values) 
+			{
+	            connectionState.ConnectionInterruptProcessingComplete = false;
+	        }
+	    }		
 	}
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs?rev=961505&r1=961504&r2=961505&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
Wed Jul  7 20:50:17 2010
@@ -370,16 +370,19 @@ namespace Apache.NMS.ActiveMQ.Transport.
                     failedConnectTransportURI = ConnectedTransportURI;
                     ConnectedTransportURI = null;
                     connected = false;
+					
+					stateTracker.TransportInterrupted();
+					
+	                if(this.Interrupted != null)
+	                {
+	                    this.Interrupted(transport);
+	                }
+					
                     if(reconnectOk)
                     {
                         reconnectTask.Wakeup();
                     }
-                }
-
-                if(this.Interrupted != null)
-                {
-                    this.Interrupted(transport);
-                }
+				}
             }
         }
 
@@ -1155,7 +1158,16 @@ namespace Apache.NMS.ActiveMQ.Transport.
 
             return false;
         }
-        
+
+        public void ConnectionInterruptProcessingComplete(ConnectionId connectionId)
+        {
+            lock(reconnectMutex)
+            {
+                Tracer.Debug("Connection Interrupt Processing is complete for ConnectionId:
" + connectionId);
+                stateTracker.ConnectionInterruptProcessingComplete(this, connectionId);
+            }
+        }
+
         public void Dispose()
         {
             Dispose(true);



Mime
View raw message