activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1059208 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp: Connection.cs MessageConsumer.cs Session.cs TransactionContext.cs Transport/InactivityMonitor.cs Transport/TransportFilter.cs
Date Fri, 14 Jan 2011 23:37:23 GMT
Author: tabish
Date: Fri Jan 14 23:37:23 2011
New Revision: 1059208

URL: http://svn.apache.org/viewvc?rev=1059208&view=rev
Log:
https://issues.apache.org/jira/browse/AMQNET-290

Refactor how exceptions are dispatched in the Connection class to exception listeners, now
done in a separate thread to prevent lock contention as the Transport is Disposed.  The Session
is updated to not enlist a rollback operation if in a .NET style TX when the session is closed
inside the TransactionScope.  Message Consumer adds a field to track failure of its parent
connection so that calls to receive break when done after the Connection has received an OnException
call.  When an error occurs on .NET transactions during Commit or Rollback its treated as
an error that causes the parent connection to be closed as the Connection needs to be recreated
so that an attempt to recover the TX can be done on startup.  

Adds a small fix to attempt to get the InactivityMonitor to stop leaking Timer threads, seems
that the Timer class is sensitive and if Disoposed to quickly after creation / start it won't
be shutdown.

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/InactivityMonitor.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=1059208&r1=1059207&r2=1059208&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs Fri Jan
14 23:37:23 2011
@@ -57,9 +57,11 @@ namespace Apache.NMS.ActiveMQ
         private readonly IDictionary producers = Hashtable.Synchronized(new Hashtable());
         private readonly IDictionary dispatchers = Hashtable.Synchronized(new Hashtable());
         private readonly object myLock = new object();
-        private bool connected = false;
-        private bool closed = false;
-        private bool closing = false;
+        private readonly Atomic<bool> connected = new Atomic<bool>(false);
+        private readonly Atomic<bool> closed = new Atomic<bool>(false);
+        private readonly Atomic<bool> closing = new Atomic<bool>(false);
+        private readonly Atomic<bool> transportFailed = new Atomic<bool>(false);
+        private Exception firstFailureError = null;
         private int sessionCounter = 0;
         private int temporaryDestinationCounter = 0;
         private int localTransactionCounter;
@@ -80,7 +82,7 @@ namespace Apache.NMS.ActiveMQ
 
             this.transport = transport;
             this.transport.Command = new CommandHandler(OnCommand);
-            this.transport.Exception = new ExceptionHandler(OnException);
+            this.transport.Exception = new ExceptionHandler(OnTransportException);
             this.transport.Interrupted = new InterruptedHandler(OnTransportInterrupted);
             this.transport.Resumed = new ResumedHandler(OnTransportResumed);
 
@@ -283,6 +285,16 @@ namespace Apache.NMS.ActiveMQ
             set { this.transport = value; }
         }
 
+        public bool TransportFailed
+        {
+            get { return this.transportFailed.Value; }
+        }
+
+        public Exception FirstFailureError
+        {
+            get { return this.firstFailureError; }
+        }
+
         public TimeSpan RequestTimeout
         {
             get { return this.requestTimeout; }
@@ -309,7 +321,7 @@ namespace Apache.NMS.ActiveMQ
             get { return info.ClientId; }
             set
             {
-                if(this.connected)
+                if(this.connected.Value)
                 {
                     throw new NMSException("You cannot change the ClientId once the Connection
is connected");
                 }
@@ -455,7 +467,7 @@ namespace Apache.NMS.ActiveMQ
 
         internal void AddSession(Session session)
         {
-            if(!this.closing)
+            if(!this.closing.Value)
             {
                 sessions.Add(session);
             }
@@ -463,7 +475,7 @@ namespace Apache.NMS.ActiveMQ
 
         internal void RemoveSession(Session session)
         {
-            if(!this.closing)
+            if(!this.closing.Value)
             {
                 sessions.Remove(session);
             }
@@ -471,7 +483,7 @@ namespace Apache.NMS.ActiveMQ
 
         internal void addDispatcher( ConsumerId id, IDispatcher dispatcher )
         {
-            if(!this.closing)
+            if(!this.closing.Value)
             {
                 this.dispatchers.Add( id, dispatcher );
             }
@@ -479,7 +491,7 @@ namespace Apache.NMS.ActiveMQ
 
         internal void removeDispatcher( ConsumerId id )
         {
-            if(!this.closing)
+            if(!this.closing.Value)
             {
                 this.dispatchers.Remove( id );
             }
@@ -487,7 +499,7 @@ namespace Apache.NMS.ActiveMQ
 
         internal void addProducer( ProducerId id, MessageProducer producer )
         {
-            if(!this.closing)
+            if(!this.closing.Value)
             {
                 this.producers.Add( id, producer );
             }
@@ -495,7 +507,7 @@ namespace Apache.NMS.ActiveMQ
 
         internal void removeProducer( ProducerId id )
         {
-            if(!this.closing)
+            if(!this.closing.Value)
             {
                 this.producers.Remove( id );
             }
@@ -505,7 +517,7 @@ namespace Apache.NMS.ActiveMQ
         {
             lock(myLock)
             {
-                if(this.closed)
+                if(this.closed.Value)
                 {
                     return;
                 }
@@ -513,7 +525,7 @@ namespace Apache.NMS.ActiveMQ
                 try
                 {
                     Tracer.Info("Closing Connection.");
-                    this.closing = true;
+                    this.closing.Value = true;
                     lock(sessions.SyncRoot)
                     {
                         foreach(Session session in sessions)
@@ -523,7 +535,7 @@ namespace Apache.NMS.ActiveMQ
                     }
                     sessions.Clear();
 
-                    if(connected)
+                    if(connected.Value)
                     {
                         DisposeOf(ConnectionId);
                         ShutdownInfo shutdowninfo = new ShutdownInfo();
@@ -540,9 +552,9 @@ namespace Apache.NMS.ActiveMQ
                 finally
                 {
                     this.transport = null;
-                    this.closed = true;
-                    this.connected = false;
-                    this.closing = false;
+                    this.closed.Value = true;
+                    this.connected.Value = false;
+                    this.closing.Value = false;
                 }
             }
         }
@@ -643,7 +655,7 @@ namespace Apache.NMS.ActiveMQ
                 if(asyncClose)
                 {
                     Tracer.Info("Asynchronously disposing of Connection.");
-                    if(connected)
+                    if(connected.Value)
                     {
                         transport.Oneway(command);
                     }
@@ -667,24 +679,24 @@ namespace Apache.NMS.ActiveMQ
 
         internal void CheckConnected()
         {
-            if(closed)
+            if(closed.Value)
             {
                 throw new ConnectionClosedException();
             }
 
-            if(!connected)
+            if(!connected.Value)
             {
                 if(!this.userSpecifiedClientID)
                 {
                     this.info.ClientId = this.clientIdGenerator.GenerateId();
                 }
 
-                connected = true;
+                connected.Value = true;
                 // now lets send the connection and see if we get an ack/nak
                 if(null == SyncRequest(info))
                 {
-                    closed = true;
-                    connected = false;
+                    closed.Value = true;
+                    connected.Value = false;
                     throw new ConnectionClosedException();
                 }
             }
@@ -717,9 +729,9 @@ namespace Apache.NMS.ActiveMQ
             }
             else if(command.IsShutdownInfo)
             {
-                if(!closing && !closed)
+                if(!closing.Value && !closed.Value)
                 {
-                    OnException(commandTransport, new NMSException("Broker closed this connection."));
+                    OnException(new NMSException("Broker closed this connection."));
                 }
             }
             else if(command.IsProducerAck)
@@ -741,7 +753,7 @@ namespace Apache.NMS.ActiveMQ
             }
             else if(command.IsConnectionError)
             {
-                if(!closing && !closed)
+                if(!closing.Value && !closed.Value)
                 {
                     ConnectionError connectionError = (ConnectionError) command;
                     BrokerError brokerError = connectionError.Exception;
@@ -757,7 +769,7 @@ namespace Apache.NMS.ActiveMQ
                         }
                     }
 
-                    OnException(commandTransport, new NMSConnectionException(message, cause));
+                    OnException(new NMSConnectionException(message, cause));
                 }
             }
             else
@@ -799,7 +811,7 @@ namespace Apache.NMS.ActiveMQ
 
             try
             {
-                if(connected)
+                if(connected.Value)
                 {
                     Tracer.Info("Returning KeepAliveInfo Response.");
                     info.ResponseRequired = false;
@@ -808,30 +820,98 @@ namespace Apache.NMS.ActiveMQ
             }
             catch(Exception ex)
             {
-                if(!closing && !closed)
+                if(!closing.Value && !closed.Value)
                 {
-                    OnException(commandTransport, ex);
+                    OnException(ex);
                 }
             }
         }
 
-        protected void OnException(ITransport sender, Exception exception)
+        internal void OnAsyncException(Exception error)
+        {
+            if(!this.closed.Value && !this.closing.Value)
+            {
+                if(this.ExceptionListener != null)
+                {
+                    if(!(error is NMSException))
+                    {
+                        error = NMSExceptionSupport.Create(error);
+                    }
+                    NMSException e = (NMSException)error;
+
+                    // Called in another thread so that processing can continue
+                    // here, ensures no lock contention.
+                    ThreadPool.QueueUserWorkItem(AsyncCallExceptionListener, e);
+                }
+                else
+                {
+                    Tracer.Debug("Async exception with no exception listener: " + error);
+                }
+            }
+        }
+
+        private void AsyncCallExceptionListener(object error)
+        {
+            NMSException exception = error as NMSException;
+            this.ExceptionListener(exception);
+        }
+
+        internal void OnTransportException(ITransport source, Exception cause)
+        {
+            this.OnException(cause);
+        }
+
+        internal void OnException(Exception error)
         {
+            OnAsyncException(error);
+
+            if(!this.closing.Value && !this.closed.Value)
+            {
+                // Perform the actual work in another thread to avoid lock contention
+                // and allow the caller to continue on in its error cleanup.
+                ThreadPool.QueueUserWorkItem(AsyncOnExceptionHandler, error);
+            }
+        }
+
+        private void AsyncOnExceptionHandler(object error)
+        {
+            Exception cause = error as Exception;
+
+            MarkTransportFailed(cause);
+
+            try
+            {
+                this.transport.Dispose();
+            }
+            catch(Exception ex)
+            {
+                Tracer.Debug("Caught Exception While disposing of Transport: " + ex);
+            }
+
             this.brokerInfoReceived.countDown();
 
-            if(ExceptionListener != null && !this.closing)
+            foreach(Session session in this.sessions)
             {
                 try
                 {
-                    ExceptionListener(exception);
+                    session.Dispose();
                 }
-                catch
+                catch(Exception ex)
                 {
-                    sender.Dispose();
+                    Tracer.Debug("Caught Exception While disposing of Sessions: " + ex);
                 }
             }
         }
 
+        private void MarkTransportFailed(Exception error)
+        {
+            this.transportFailed.Value = true;
+            if(this.firstFailureError == null)
+            {
+                this.firstFailureError = error;
+            }
+        }
+
         protected void OnTransportInterrupted(ITransport sender)
         {
             Tracer.Debug("Connection: Transport has been Interrupted.");
@@ -857,7 +937,7 @@ namespace Apache.NMS.ActiveMQ
                 }
             }
 
-            if(this.ConnectionInterruptedListener != null && !this.closing )
+            if(this.ConnectionInterruptedListener != null && !this.closing.Value)
             {
                 try
                 {
@@ -873,7 +953,7 @@ namespace Apache.NMS.ActiveMQ
         {
             Tracer.Debug("Transport has resumed normal operation.");
 
-            if(this.ConnectionResumedListener != null && !this.closing )
+            if(this.ConnectionResumedListener != null && !this.closing.Value)
             {
                 try
                 {
@@ -967,7 +1047,7 @@ namespace Apache.NMS.ActiveMQ
             CountDownLatch cdl = this.transportInterruptionProcessingComplete;
             if(cdl != null)
             {
-                if(!closed && cdl.Remaining > 0)
+                if(!closed.Value && cdl.Remaining > 0)
                 {
                     Tracer.Warn("dispatch paused, waiting for outstanding dispatch interruption
" +
                                 "processing (" + cdl.Remaining + ") to complete..");

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=1059208&r1=1059207&r2=1059208&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
Fri Jan 14 23:37:23 2011
@@ -60,6 +60,8 @@ namespace Apache.NMS.ActiveMQ
 		private bool clearDispatchList = false;
 		private bool inProgressClearRequiredFlag;
 
+        private Exception failureError;
+
 		private const int DEFAULT_REDELIVERY_DELAY = 0;
 		private const int DEFAULT_MAX_REDELIVERIES = 5;
 
@@ -170,6 +172,12 @@ namespace Apache.NMS.ActiveMQ
 			set { ignoreExpiration = value; }
 		}
 
+        public Exception FailureError
+        {
+            get { return this.failureError; }
+            set { this.failureError = value; }
+        }
+
 		#endregion
 
 		#region IMessageConsumer Members
@@ -700,6 +708,13 @@ namespace Apache.NMS.ActiveMQ
 					}
 					else
 					{
+                        // Informs the caller of an error in the event that an async exception
+                        // took down the parent connection.
+                        if(this.failureError != null)
+                        {
+                            throw NMSExceptionSupport.Create(this.failureError);
+                        }
+
 						return null;
 					}
 				}

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs?rev=1059208&r1=1059207&r2=1059208&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs Fri Jan
14 23:37:23 2011
@@ -349,6 +349,7 @@ namespace Apache.NMS.ActiveMQ
                     {
                         foreach(MessageConsumer consumer in consumers.Values)
                         {
+                            consumer.FailureError = this.connection.FirstFailureError;
                             consumer.DoClose();
                             this.lastDeliveredSequenceId =
                                 Math.Min(this.lastDeliveredSequenceId, consumer.LastDeliveredSequenceId);
@@ -366,7 +367,7 @@ namespace Apache.NMS.ActiveMQ
                     producers.Clear();
 
                     // If in a transaction roll it back
-                    if(this.IsTransacted && this.transactionContext.InTransaction)
+                    if(this.IsTransacted && this.transactionContext.InLocalTransaction)
                     {
                         try
                         {

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs?rev=1059208&r1=1059207&r2=1059208&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs
Fri Jan 14 23:37:23 2011
@@ -53,7 +53,12 @@ namespace Apache.NMS.ActiveMQ
         {
             get{ return this.transactionId != null; }
         }
-        
+
+        public bool InLocalTransaction
+        {
+            get{ return this.transactionId != null && this.currentEnlistment == null;
}
+        }
+
         public TransactionId TransactionId
         {
             get { return transactionId; }
@@ -323,7 +328,7 @@ namespace Apache.NMS.ActiveMQ
                 Tracer.Debug("Transaction Commit failed with error: " + ex.Message);
                 AfterRollback();
                 enlistment.Done();
-                this.session.Connection.OnSessionException(this.session, ex);
+                this.session.Connection.OnException(ex);
             }
             finally
             {
@@ -364,7 +369,7 @@ namespace Apache.NMS.ActiveMQ
                 Tracer.Debug("Transaction Single Phase Commit failed with error: " + ex.Message);
                 AfterRollback();
                 enlistment.Done();
-                this.session.Connection.OnSessionException(this.session, ex);
+                this.session.Connection.OnException(ex);
             }
             finally
             {
@@ -406,7 +411,7 @@ namespace Apache.NMS.ActiveMQ
                 Tracer.Debug("Transaction Rollback failed with error: " + ex.Message);
                 AfterRollback();
                 enlistment.Done();
-                this.session.Connection.OnSessionException(this.session, ex);
+                this.session.Connection.OnException(ex);
             }
             finally
             {

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/InactivityMonitor.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/InactivityMonitor.cs?rev=1059208&r1=1059207&r2=1059208&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/InactivityMonitor.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/InactivityMonitor.cs
Fri Jan 14 23:37:23 2011
@@ -108,8 +108,14 @@ namespace Apache.NMS.ActiveMQ.Transport
                 // get rid of unmanaged stuff
             }
 
-            this.disposing = true;
-            StopMonitorThreads();
+            lock(monitor)
+            {
+                this.localWireFormatInfo = null;
+                this.remoteWireFormatInfo = null;
+                this.disposing = true;
+                StopMonitorThreads();
+            }
+
             base.Dispose(disposing);
         }
 		
@@ -364,7 +370,7 @@ namespace Apache.NMS.ActiveMQ.Transport
                     // Attempt to wait for the Timer to shutdown, but don't wait
                     // forever, if they don't shutdown after two seconds, just quit.
                     this.connectionCheckTimer.Dispose(shutdownEvent);
-                    if(!shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000), false))
+                    if(!shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(3000), false))
                     {
                         Tracer.WarnFormat("InactivityMonitor[{0}]: Timer Task didn't shutdown
properly.", instanceId);
                     }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs?rev=1059208&r1=1059207&r2=1059208&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs
Fri Jan 14 23:37:23 2011
@@ -149,7 +149,7 @@ namespace Apache.NMS.ActiveMQ.Transport
 
 		protected virtual void Dispose(bool disposing)
 		{
-			if(disposing)
+			if(disposing && !disposed)
 			{
                 Tracer.Debug("TransportFilter disposing of next Transport: " +
                              this.next.GetType().Name);



Mime
View raw message