activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jgo...@apache.org
Subject svn commit: r693516 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp: ./ Transport/ Transport/Tcp/
Date Tue, 09 Sep 2008 17:08:10 GMT
Author: jgomes
Date: Tue Sep  9 10:08:09 2008
New Revision: 693516

URL: http://svn.apache.org/viewvc?rev=693516&view=rev
Log:
Apply patch from Chris Fraire.  Thanks, Chris!
Fixes [AMQNET-112]. (See https://issues.apache.org/activemq/browse/AMQNET-112)

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/DispatchingThread.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/MutexTransport.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/WireFormatNegotiator.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=693516&r1=693515&r2=693516&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs Tue Sep
 9 10:08:09 2008
@@ -20,6 +20,7 @@
 using Apache.NMS.Util;
 using System;
 using System.Collections;
+using System.Threading;
 
 namespace Apache.NMS.ActiveMQ
 {
@@ -35,18 +36,21 @@
 		private TimeSpan requestTimeout;
 		private BrokerInfo brokerInfo; // from broker
 		private WireFormatInfo brokerWireFormatInfo; // from broker
-		private readonly IList sessions = ArrayList.Synchronized(new ArrayList());
+		private readonly IList sessions = new ArrayList();
+		/// <summary>
+		/// Private object used for synchronization, instead of public "this"
+		/// </summary>
+		private readonly object myLock = new object();
 		private bool asyncSend = false;
 		private bool asyncClose = true;
-		private bool connected = false;
-		private bool closed = false;
-		private bool closing = false;
+		private volatile bool closed;
+		private volatile bool triedConnect;
 		private long sessionCounter = 0;
 		private long temporaryDestinationCounter = 0;
 		private long localTransactionCounter;
-		private readonly AtomicBoolean started = new AtomicBoolean(false);
+		private bool started = false;
 		private bool disposed = false;
-		
+
 		public Connection(Uri connectionUri, ITransport transport, ConnectionInfo info)
 		{
 			this.brokerUri = connectionUri;
@@ -107,11 +111,15 @@
 		public void Start()
 		{
 			CheckConnected();
-			if(started.CompareAndSet(false, true))
+			lock(myLock)
 			{
-				foreach(Session session in sessions)
+				if(!started)
 				{
-					session.StartAsyncDelivery(null);
+					started = true;
+					foreach(Session session in sessions)
+					{
+						session.StartAsyncDelivery(null);
+					}
 				}
 			}
 		}
@@ -122,7 +130,7 @@
 		/// </summary>
 		public bool IsStarted
 		{
-			get { return started.Value; }
+			get { return started; }
 		}
 
 		/// <summary>
@@ -132,15 +140,19 @@
 		public void Stop()
 		{
 			CheckConnected();
-			if(started.CompareAndSet(true, false))
+			lock(myLock)
 			{
-				foreach(Session session in sessions)
+				if(started)
 				{
-					session.StopAsyncDelivery();
+					started = false;
+					foreach(Session session in sessions)
+					{
+						session.StopAsyncDelivery();
+					}
 				}
 			}
 		}
-		
+
 		/// <summary>
 		/// Creates a new session to work on this connection
 		/// </summary>
@@ -162,20 +174,22 @@
 			System.Collections.Specialized.StringDictionary map = URISupport.ParseQuery(this.brokerUri.Query);
 			URISupport.SetProperties(session, map, "session.");
 
-			if(IsStarted)
+			lock(myLock)
 			{
-				session.StartAsyncDelivery(null);
-			}
+				if(IsStarted)
+				{
+					session.StartAsyncDelivery(null);
+				}
 
-			sessions.Add(session);
+				sessions.Add(session);
+			}
 			return session;
 		}
 
 		public void RemoveSession(Session session)
 		{
 			DisposeOf(session.SessionId);
-
-			if(!closing)
+			lock(myLock)
 			{
 				sessions.Remove(session);
 			}
@@ -183,7 +197,26 @@
 
 		public void Close()
 		{
-			lock(this)
+			if(closed)
+			{
+				return;
+			}
+
+			//
+			// Do a first-run close of sessions after brief synchronization
+			//
+			IList sessionsCopy;
+			lock(myLock)
+			{
+				sessionsCopy = new ArrayList(sessions);
+			}
+
+			foreach(Session session in sessionsCopy)
+			{
+				session.Close();
+			}
+
+			lock(myLock)
 			{
 				if(closed)
 				{
@@ -192,8 +225,11 @@
 
 				try
 				{
-					closing = true;
-					foreach(Session session in sessions)
+					//
+					// Copy again for safe enumeration.  Always assume
+					// that closing a session modifies the sessions list.
+					//
+					foreach(Session session in new ArrayList(sessions))
 					{
 						session.Close();
 					}
@@ -207,10 +243,11 @@
 				{
 					Tracer.ErrorFormat("Error during connection close: {0}", ex);
 				}
-
-				transport = null;
-				closed = true;
-				closing = false;
+				finally
+				{
+					closed = true;
+					transport = null;
+				}
 			}
 		}
 
@@ -246,14 +283,14 @@
 
 			disposed = true;
 		}
-		
+
 		// Properties
 
 		public Uri BrokerUri
 		{
 			get { return brokerUri; }
 		}
-		
+
 		public ITransport ITransport
 		{
 			get { return transport; }
@@ -271,41 +308,41 @@
 			get { return acknowledgementMode; }
 			set { this.acknowledgementMode = value; }
 		}
-		
+
 		public string ClientId
 		{
 			get { return info.ClientId; }
 			set
 			{
-				if(connected)
+				if(triedConnect)
 				{
 					throw new NMSException("You cannot change the ClientId once the Connection is connected");
 				}
 				info.ClientId = value;
 			}
 		}
-		
+
 		public ConnectionId ConnectionId
 		{
 			get { return info.ConnectionId; }
 		}
-		
+
 		public BrokerInfo BrokerInfo
 		{
 			get { return brokerInfo; }
 		}
-		
+
 		public WireFormatInfo BrokerWireFormat
 		{
 			get { return brokerWireFormatInfo; }
 		}
-		
+
 		// Implementation methods
 
 		/// <summary>
 		/// Performs a synchronous request-response with the broker
 		/// </summary>
-		/// 
+		///
 
 		public Response SyncRequest(Command command)
 		{
@@ -360,12 +397,11 @@
 		/// </summary>
 		public String CreateTemporaryDestinationName()
 		{
-			lock(this)
-			{
-				return info.ConnectionId.Value + ":" + (++temporaryDestinationCounter);
-			}
+			return info.ConnectionId.Value
+				+ ":"
+				+ Interlocked.Increment(ref temporaryDestinationCounter);
 		}
-		
+
 		/// <summary>
 		/// Creates a new local transaction ID
 		/// </summary>
@@ -373,13 +409,10 @@
 		{
 			LocalTransactionId id= new LocalTransactionId();
 			id.ConnectionId = ConnectionId;
-			lock(this)
-			{
-				id.Value = (++localTransactionCounter);
-			}
+			id.Value = Interlocked.Increment(ref localTransactionCounter);
 			return id;
 		}
-		
+
 		protected void CheckConnected()
 		{
 			if(closed)
@@ -387,19 +420,35 @@
 				throw new ConnectionClosedException();
 			}
 
-			if(!connected)
+			if(triedConnect)
 			{
-				connected = true;
+				return;
+			}
+
+			lock(myLock)
+			{
+				if(closed)
+				{
+					throw new ConnectionClosedException();
+				}
+
+				if(triedConnect)
+				{
+					return;
+				}
+
+				// Set this in advance, to short-circuit SyncRequest's call to this method
+				triedConnect = true;
+
 				// now lets send the connection and see if we get an ack/nak
 				if(null == SyncRequest(info))
 				{
 					closed = true;
-					connected = false;
 					throw new ConnectionClosedException();
 				}
 			}
 		}
-		
+
 		/// <summary>
 		/// Handle incoming commands
 		/// </summary>
@@ -422,9 +471,12 @@
 			else if(command is ShutdownInfo)
 			{
 				//ShutdownInfo info = (ShutdownInfo)command;
-				if(!closing && !closed)
+				lock(myLock)
 				{
-					OnException(commandTransport, new NMSException("Broker closed this connection."));
+					if(!closed)
+					{
+						OnException(commandTransport, new NMSException("Broker closed this connection."));
+					}
 				}
 			}
 			else
@@ -437,12 +489,15 @@
 		{
 			bool dispatched = false;
 
-			foreach(Session session in sessions)
+			lock(myLock)
 			{
-				if(session.DispatchMessage(dispatch.ConsumerId, dispatch.Message))
+				foreach(Session session in sessions)
 				{
-					dispatched = true;
-					break;
+					if(session.DispatchMessage(dispatch.ConsumerId, dispatch.Message))
+					{
+						dispatched = true;
+						break;
+					}
 				}
 			}
 
@@ -483,19 +538,16 @@
 				}
 			}
 		}
-		
+
 		protected SessionInfo CreateSessionInfo(AcknowledgementMode sessionAcknowledgementMode)
 		{
 			SessionInfo answer = new SessionInfo();
 			SessionId sessionId = new SessionId();
 			sessionId.ConnectionId = info.ConnectionId.Value;
-			lock(this)
-			{
-				sessionId.Value = ++sessionCounter;
-			}
+			sessionId.Value = Interlocked.Increment(ref sessionCounter);
 			answer.SessionId = sessionId;
 			return answer;
 		}
-		
+
 	}
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/DispatchingThread.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/DispatchingThread.cs?rev=693516&r1=693515&r2=693516&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/DispatchingThread.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/DispatchingThread.cs
Tue Sep  9 10:08:09 2008
@@ -27,7 +27,7 @@
 		public delegate void ExceptionHandler(Exception exception);
 
 		private readonly AutoResetEvent m_event = new AutoResetEvent(false);
-		private bool m_bStopFlag = false;
+		private readonly ManualResetEvent m_stopEvent = new ManualResetEvent(false);
 		private Thread m_thread = null;
 		private readonly DispatchFunction m_dispatchFunc;
 		private event ExceptionHandler m_exceptionListener;
@@ -37,7 +37,18 @@
 			m_dispatchFunc = dispatchFunc;
 		}
 
-               // TODO can't use EventWaitHandle on MONO 1.0
+		public bool IsStarted
+		{
+			get
+			{
+				lock(this)
+				{
+					return (null != m_thread);
+				}
+			}
+		}
+
+			   // TODO can't use EventWaitHandle on MONO 1.0
 		public AutoResetEvent EventHandle
 		{
 			get { return m_event; }
@@ -49,7 +60,7 @@
 			{
 				m_exceptionListener += value;
 			}
-			remove 
+			remove
 			{
 				m_exceptionListener -= value;
 			}
@@ -61,10 +72,9 @@
 			{
 				if (m_thread == null)
 				{
-					m_bStopFlag = false;
+					m_stopEvent.Reset();
 					m_thread = new Thread(new ThreadStart(MyThreadFunc));
 					m_thread.IsBackground = true;
-					m_event.Set();
 					Tracer.Info("Starting dispatcher thread for session");
 					m_thread.Start();
 				}
@@ -76,20 +86,16 @@
 			Stop(System.Threading.Timeout.Infinite);
 		}
 
-		
+
 		internal void Stop(int timeoutMilliseconds)
 		{
 			Tracer.Info("Stopping dispatcher thread for session");
-			Thread localThread = null;
+			Thread localThread;
 			lock (this)
 			{
 				localThread = m_thread;
 				m_thread = null;
-				if (!m_bStopFlag)
-				{
-					m_bStopFlag = true;
-					m_event.Set();
-				}
+				m_stopEvent.Set();
 			}
 			if(localThread!=null)
 			{
@@ -101,30 +107,54 @@
 			}
 			Tracer.Info("Dispatcher thread joined");
 		}
-		
+
 		private void MyThreadFunc()
 		{
 			Tracer.Info("Dispatcher thread started");
-			while (true) // loop forever (well, at least until we've been asked to stop)
-			{
-				lock (this)
-				{
-					if (m_bStopFlag)
-						break;
-				}
 
-				try
-				{
-					m_dispatchFunc();
-				}
-				catch (Exception ex)
-				{
-					if (m_exceptionListener != null)
-						m_exceptionListener(ex);
+			//
+			// Put m_stopEvent first so it is preferred if both are signaled
+			//
+			WaitHandle[] signals = new WaitHandle[] {
+				m_stopEvent,
+				m_event
+			};
+			const int kStopEventOffset = 0;
+
+			try
+			{
+				while (true) // loop forever (well, at least until we've been asked to stop)
+				{
+					try
+					{
+						m_dispatchFunc();
+					}
+					catch(ThreadAbortException)
+					{
+						// Throw for handling down below
+						throw;
+					}
+					catch(Exception ex)
+					{
+						if(m_exceptionListener != null)
+						{
+							m_exceptionListener(ex);
+						}
+					}
+
+					int sigOffset = WaitHandle.WaitAny(signals);
+					if(kStopEventOffset == sigOffset)
+					{
+						break;
+					}
+					// otherwise, continue the loop
 				}
-				m_event.WaitOne();
+				Tracer.Info("Dispatcher thread stopped");
+			}
+			catch(ThreadAbortException)
+			{
+				Tracer.Info("Dispatcher thread aborted");
 			}
-			Tracer.Info("Dispatcher thread stopped");
 		}
 	}
-}
\ No newline at end of file
+}

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs?rev=693516&r1=693515&r2=693516&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
Tue Sep  9 10:08:09 2008
@@ -144,11 +144,7 @@
 			{
 				MessageId id = new MessageId();
 				id.ProducerId = info.ProducerId;
-				lock (this)
-				{
-					id.ProducerSequenceId = ++messageCounter;
-				}
-
+				id.ProducerSequenceId = Interlocked.Increment(ref messageCounter);
 				activeMessage.MessageId = id;
 			}
 

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs?rev=693516&r1=693515&r2=693516&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs Tue Sep
 9 10:08:09 2008
@@ -16,8 +16,10 @@
  */
 using System;
 using System.Collections;
+using System.Threading;
 using Apache.NMS.ActiveMQ.Commands;
 using Apache.NMS;
+using Apache.NMS.Util;
 
 namespace Apache.NMS.ActiveMQ
 {
@@ -27,15 +29,20 @@
 	public class Session : ISession
 	{
 		private long consumerCounter;
-		private readonly IDictionary consumers = Hashtable.Synchronized(new Hashtable());
-		private readonly IDictionary producers = Hashtable.Synchronized(new Hashtable());
+		private readonly IDictionary consumers = new Hashtable();
+		private readonly IDictionary producers = new Hashtable();
 		private readonly DispatchingThread dispatchingThread;
+		/// <summary>
+		/// Private object used for synchronization, instead of public "this"
+		/// </summary>
+		private readonly object myLock = new object();
 		private DispatchingThread.ExceptionHandler dispatchingThread_ExceptionHandler;
 		private readonly SessionInfo info;
 		private long producerCounter;
-		internal bool startedAsyncDelivery = false;
 		private bool disposed = false;
-        private bool closed = false;
+		private volatile bool closed = false;
+
+		private const int kMAX_STOP_ASYNC_MS = 30000;
 
 		public Session(Connection connection, SessionInfo info, AcknowledgementMode acknowledgementMode)
 		{
@@ -146,9 +153,37 @@
 
 		public void Close()
 		{
-			lock(this)
+			if(closed)
+			{
+				return;
+			}
+
+			//
+			// Do a first-run close of consumer and producers after
+			// brief synchronization
+			//
+			IList consumersCopy;
+			IList producersCopy;
+
+			lock(myLock)
 			{
-				if(this.closed)
+				consumersCopy = new ArrayList(consumers.Values);
+				producersCopy = new ArrayList(producers.Values);
+			}
+
+			foreach(MessageConsumer consumer in consumersCopy)
+			{
+				consumer.Close();
+			}
+
+			foreach(MessageProducer producer in producersCopy)
+			{
+				producer.Close();
+			}
+
+			lock(myLock)
+			{
+				if(closed)
 				{
 					return;
 				}
@@ -156,26 +191,30 @@
 				try
 				{
 					StopAsyncDelivery();
-					Connection.RemoveSession(this);
-					foreach(MessageConsumer consumer in GetConsumers())
+					// Copy again for safe enumeration
+					foreach(MessageConsumer consumer in new ArrayList(consumers.Values))
 					{
 						consumer.Close();
 					}
 					consumers.Clear();
 
-					foreach(MessageProducer producer in GetProducers())
+					// Copy again for safe enumeration
+					foreach(MessageProducer producer in new ArrayList(producers.Values))
 					{
 						producer.Close();
 					}
 					producers.Clear();
+					Connection.RemoveSession(this);
 				}
-				catch(Exception ex)
+				catch (Exception ex)
 				{
 					Tracer.ErrorFormat("Error during session close: {0}", ex);
 				}
-
-				this.connection = null;
-				this.closed = true;
+				finally
+				{
+					this.connection = null;
+					this.closed = true;
+				}
 			}
 		}
 
@@ -188,19 +227,24 @@
 		{
 			ProducerInfo command = CreateProducerInfo(destination);
 			ProducerId producerId = command.ProducerId;
-			MessageProducer producer = null;
+			MessageProducer producer = new MessageProducer(this, command);
+			lock(myLock)
+			{
+				producers[producerId] = producer;
+			}
 
 			try
 			{
-				producer = new MessageProducer(this, command);
-				producers[producerId] = producer;
 				this.DoSend(command);
 			}
 			catch(Exception)
 			{
-				if(producer != null)
+				//
+				// DoSend failed.  No need to call MessageProducer.Close
+				//
+				lock(myLock)
 				{
-					producer.Close();
+					producers.Remove(producerId);
 				}
 
 				throw;
@@ -228,23 +272,31 @@
 			ConsumerId consumerId = command.ConsumerId;
 			MessageConsumer consumer = null;
 
-			try
+			consumer = new MessageConsumer(this, command, this.AcknowledgementMode);
+			// let's register the consumer first in case we start dispatching messages immediately
+			lock(myLock)
 			{
-				consumer = new MessageConsumer(this, command, this.AcknowledgementMode);
-				// lets register the consumer first in case we start dispatching messages immediately
 				consumers[consumerId] = consumer;
+			}
+
+			try
+			{
 				this.DoSend(command);
-				return consumer;
 			}
 			catch(Exception)
 			{
-				if(consumer != null)
+				//
+				// DoSend failed.  No need to call MessageProducer.Close
+				//
+				lock(myLock)
 				{
-					consumer.Close();
+					consumers.Remove(consumerId);
 				}
 
 				throw;
 			}
+
+			return consumer;
 		}
 
 		public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector,
bool noLocal)
@@ -255,18 +307,25 @@
 			command.NoLocal = noLocal;
 			MessageConsumer consumer = null;
 
-			try
+			consumer = new MessageConsumer(this, command, this.AcknowledgementMode);
+			// let's register the consumer first in case we start dispatching messages immediately
+			lock(myLock)
 			{
-				consumer = new MessageConsumer(this, command, this.AcknowledgementMode);
-				// lets register the consumer first in case we start dispatching messages immediately
 				consumers[consumerId] = consumer;
+			}
+
+			try
+			{
 				this.DoSend(command);
 			}
 			catch(Exception)
 			{
-				if(consumer != null)
+				//
+				// DoSend failed; no need to call MessageConsumer.Close
+				//
+				lock(myLock)
 				{
-					consumer.Close();
+					consumers.Remove(consumerId);
 				}
 
 				throw;
@@ -277,11 +336,15 @@
 
 		public void DeleteDurableConsumer(string name)
 		{
+			IConnection conn = this.Connection;
 			RemoveSubscriptionInfo command = new RemoveSubscriptionInfo();
-			command.ConnectionId = Connection.ConnectionId;
-			command.ClientId = Connection.ClientId;
-			command.SubcriptionName = name;
+			if(null != conn)
+			{
+				command.ConnectionId = Connection.ConnectionId;
+				command.ClientId = Connection.ClientId;
+			}
 
+			command.SubcriptionName = name;
 			this.DoSend(command);
 		}
 
@@ -378,9 +441,12 @@
 			this.TransactionContext.Rollback();
 
 			// lets ensure all the consumers redeliver any rolled back messages
-			foreach(MessageConsumer consumer in GetConsumers())
+			lock(myLock)
 			{
-				consumer.RedeliverRolledBackMessages();
+				foreach(MessageConsumer consumer in consumers.Values)
+				{
+					consumer.RedeliverRolledBackMessages();
+				}
 			}
 		}
 
@@ -472,19 +538,29 @@
 		public void DisposeOf(ConsumerId objectId)
 		{
 			Connection.DisposeOf(objectId);
-			consumers.Remove(objectId);
+			lock(myLock)
+			{
+				consumers.Remove(objectId);
+			}
 		}
 
 		public void DisposeOf(ProducerId objectId)
 		{
 			Connection.DisposeOf(objectId);
-			producers.Remove(objectId);
+			lock(myLock)
+			{
+				producers.Remove(objectId);
+			}
 		}
 
 		public bool DispatchMessage(ConsumerId consumerId, Message message)
 		{
 			bool dispatched = false;
-			MessageConsumer consumer = (MessageConsumer) consumers[consumerId];
+			MessageConsumer consumer;
+			lock(myLock)
+			{
+				consumer = (MessageConsumer) consumers[consumerId];
+			}
 
 			if(consumer != null)
 			{
@@ -503,9 +579,12 @@
 		{
 			// lets iterate through each consumer created by this session
 			// ensuring that they have all pending messages dispatched
-			foreach(MessageConsumer consumer in GetConsumers())
+			lock(myLock)
 			{
-				consumer.DispatchAsyncMessages();
+				foreach(MessageConsumer consumer in consumers.Values)
+				{
+					consumer.DispatchAsyncMessages();
+				}
 			}
 		}
 
@@ -515,7 +594,7 @@
 		/// </summary>
 		protected ICollection GetConsumers()
 		{
-			lock(consumers.SyncRoot)
+			lock(myLock)
 			{
 				return new ArrayList(consumers.Values);
 			}
@@ -527,7 +606,7 @@
 		/// </summary>
 		protected ICollection GetProducers()
 		{
-			lock(producers.SyncRoot)
+			lock(myLock)
 			{
 				return new ArrayList(producers.Values);
 			}
@@ -539,10 +618,7 @@
 			ConsumerId id = new ConsumerId();
 			id.ConnectionId = info.SessionId.ConnectionId;
 			id.SessionId = info.SessionId.Value;
-			lock(this)
-			{
-				id.Value = ++consumerCounter;
-			}
+			id.Value = Interlocked.Increment(ref consumerCounter);
 			answer.ConsumerId = id;
 			answer.Destination = ActiveMQDestination.Transform(destination);
 			answer.Selector = selector;
@@ -557,7 +633,7 @@
 			ActiveMQDestination amqDestination = destination as ActiveMQDestination;
 			if(amqDestination != null && amqDestination.Options != null)
 			{
-				Util.URISupport.SetProperties(answer, amqDestination.Options, "consumer.");
+				URISupport.SetProperties(answer, amqDestination.Options, "consumer.");
 			}
 
 			return answer;
@@ -569,10 +645,7 @@
 			ProducerId id = new ProducerId();
 			id.ConnectionId = info.SessionId.ConnectionId;
 			id.SessionId = info.SessionId.Value;
-			lock(this)
-			{
-				id.Value = ++producerCounter;
-			}
+			id.Value = Interlocked.Increment(ref producerCounter);
 			answer.ProducerId = id;
 			answer.Destination = ActiveMQDestination.Transform(destination);
 
@@ -581,7 +654,7 @@
 			ActiveMQDestination amqDestination = destination as ActiveMQDestination;
 			if(amqDestination != null && amqDestination.Options != null)
 			{
-				Util.URISupport.SetProperties(answer, amqDestination.Options, "producer.");
+				URISupport.SetProperties(answer, amqDestination.Options, "producer.");
 			}
 
 			return answer;
@@ -596,11 +669,10 @@
 
 		internal void StopAsyncDelivery()
 		{
-			if(startedAsyncDelivery)
+			if(dispatchingThread.IsStarted)
 			{
 				this.dispatchingThread.ExceptionListener -= this.dispatchingThread_ExceptionHandler;
-				dispatchingThread.Stop(5000);
-				startedAsyncDelivery = false;
+				dispatchingThread.Stop(kMAX_STOP_ASYNC_MS);
 			}
 		}
 
@@ -611,11 +683,10 @@
 				dispatcher.SetAsyncDelivery(dispatchingThread.EventHandle);
 			}
 
-			if(!startedAsyncDelivery)
+			if(!dispatchingThread.IsStarted)
 			{
 				this.dispatchingThread.ExceptionListener += this.dispatchingThread_ExceptionHandler;
 				dispatchingThread.Start();
-				startedAsyncDelivery = true;
 			}
 		}
 	}

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/MutexTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/MutexTransport.cs?rev=693516&r1=693515&r2=693516&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/MutexTransport.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/MutexTransport.cs
Tue Sep  9 10:08:09 2008
@@ -20,32 +20,32 @@
 
 namespace Apache.NMS.ActiveMQ.Transport
 {
-    /// <summary>
-    /// A Transport which guards access to the next transport using a mutex.
-    /// </summary>
-    public class MutexTransport : TransportFilter
-    {
-        private readonly object transmissionLock = new object();
+	/// <summary>
+	/// A Transport which guards access to the next transport using a mutex.
+	/// </summary>
+	public class MutexTransport : TransportFilter
+	{
+		private readonly object transmissionLock = new object();
 
-        public MutexTransport(ITransport next) : base(next)
+		public MutexTransport(ITransport next) : base(next)
 		{
-        }
-        
-        public override void Oneway(Command command)
-        {
-            lock(transmissionLock)
-            {
-                this.next.Oneway(command);
-            }
-        }
+		}
 
-        public override FutureResponse AsyncRequest(Command command)
-        {
-            lock(transmissionLock)
-            {
-                return base.AsyncRequest(command);
-            }
-        }
+		public override void Oneway(Command command)
+		{
+			lock(transmissionLock)
+			{
+				this.next.Oneway(command);
+			}
+		}
+
+		public override FutureResponse AsyncRequest(Command command)
+		{
+			lock(transmissionLock)
+			{
+				return base.AsyncRequest(command);
+			}
+		}
 
 		public override Response Request(Command command, TimeSpan timeout)
 		{
@@ -54,13 +54,5 @@
 				return base.Request(command, timeout);
 			}
 		}
-
-		public override void Dispose()
-        {
-            lock(transmissionLock)
-            {
-                base.Dispose();
-            }
-        }
-    }
+	}
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs?rev=693516&r1=693515&r2=693516&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
Tue Sep  9 10:08:09 2008
@@ -17,6 +17,7 @@
 using Apache.NMS.ActiveMQ.Commands;
 using Apache.NMS.ActiveMQ.OpenWire;
 using Apache.NMS.ActiveMQ.Transport;
+using Apache.NMS.Util;
 using System;
 using System.IO;
 using System.Net.Sockets;
@@ -24,23 +25,23 @@
 
 namespace Apache.NMS.ActiveMQ.Transport.Tcp
 {
-	
+
 	/// <summary>
 	/// An implementation of ITransport that uses sockets to communicate with the broker
 	/// </summary>
 	public class TcpTransport : ITransport
 	{
-		private readonly object initLock = new object();
+		private readonly object myLock = new object();
 		private readonly Socket socket;
 		private IWireFormat wireformat;
 		private BinaryReader socketReader;
 		private BinaryWriter socketWriter;
-		private readonly object socketWriterLock = new object();
 		private Thread readThread;
 		private bool started;
-		private Util.AtomicBoolean closed = new Util.AtomicBoolean(false);
+		private volatile bool closed;
+		private volatile bool seenShutdown;
 		private TimeSpan maxWait = TimeSpan.FromMilliseconds(Timeout.Infinite);
-		
+
 		private CommandHandler commandHandler;
 		private ExceptionHandler exceptionHandler;
 		private TimeSpan MAX_THREAD_WAIT = TimeSpan.FromMilliseconds(30000);
@@ -51,13 +52,18 @@
 			this.socket = socket;
 			this.wireformat = wireformat;
 		}
-		
+
+		~TcpTransport()
+		{
+			Dispose(false);
+		}
+
 		/// <summary>
 		/// Method Start
 		/// </summary>
 		public void Start()
 		{
-			lock (initLock)
+			lock(myLock)
 			{
 				if (!started)
 				{
@@ -95,22 +101,27 @@
 		{
 			get
 			{
-				lock(initLock)
+				lock(myLock)
 				{
 					return started;
 				}
 			}
 		}
-		
+
 		public void Oneway(Command command)
 		{
-			lock (socketWriterLock)
+			lock(myLock)
 			{
 				try
 				{
-					if(closed.Value)
+					if(closed)
+					{
+						throw new InvalidOperationException("Error writing to broker.  Transport connection
is closed.");
+					}
+
+					if(command is ShutdownInfo)
 					{
-						throw new Exception("Error writing to broker.  Transport connection is closed.");
+						seenShutdown = true;
 					}
 
 					Wireformat.Marshal(command, socketWriter);
@@ -134,7 +145,7 @@
 				}
 			}
 		}
-		
+
 		public FutureResponse AsyncRequest(Command command)
 		{
 			throw new NotImplementedException("Use a ResponseCorrelator if you want to issue AsyncRequest
calls");
@@ -169,13 +180,20 @@
 		{
 			throw new NotImplementedException("Use a ResponseCorrelator if you want to issue Request
calls");
 		}
-		
+
 		public void Close()
 		{
-			if(closed.CompareAndSet(false, true))
+			if(!closed)
 			{
-				lock(initLock)
+				lock(myLock)
 				{
+					if(closed)
+					{
+						return;
+					}
+
+					closed = true;
+
 					try
 					{
 						socket.Shutdown(SocketShutdown.Both);
@@ -186,12 +204,9 @@
 
 					try
 					{
-						lock(socketWriterLock)
+						if(null != socketWriter)
 						{
-							if(null != socketWriter)
-							{
-								socketWriter.Close();
-							}
+							socketWriter.Close();
 						}
 					}
 					catch
@@ -260,9 +275,15 @@
 
 		public void Dispose()
 		{
+			Dispose(true);
+			GC.SuppressFinalize(this);
+		}
+
+		protected void Dispose(bool disposing)
+		{
 			Close();
 		}
-		
+
 		public void ReadLoop()
 		{
 			// This is the thread function for the reader thread. This runs continuously
@@ -279,7 +300,7 @@
 			// An exception in the command handler may not be fatal to the transport, so
 			// these are simply reported to the exceptionHandler.
 			//
-			while(!closed.Value)
+			while(!closed)
 			{
 				Command command = null;
 
@@ -290,7 +311,7 @@
 				catch(Exception ex)
 				{
 					command = null;
-					if(!closed.Value)
+					if(!closed && !seenShutdown)
 					{
 						// Close the socket as there's little that can be done with this transport now.
 						Close();
@@ -313,9 +334,9 @@
 				}
 			}
 		}
-				
+
 		// Implementation methods
-				
+
 		public CommandHandler Command
 		{
 			get { return commandHandler; }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs?rev=693516&r1=693515&r2=693516&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs
Tue Sep  9 10:08:09 2008
@@ -24,28 +24,33 @@
 	/// Used to implement a filter on the transport layer.
 	/// </summary>
 	public class TransportFilter : ITransport
-    {
+	{
 		protected readonly ITransport next;
 		protected CommandHandler commandHandler;
 		protected ExceptionHandler exceptionHandler;
-		
+
 		public TransportFilter(ITransport next)
 		{
 			this.next = next;
 			this.next.Command = new CommandHandler(OnCommand);
 			this.next.Exception = new ExceptionHandler(OnException);
 		}
-		
+
+		~TransportFilter()
+		{
+			Dispose(false);
+		}
+
 		protected virtual void OnCommand(ITransport sender, Command command)
 		{
 			this.commandHandler(sender, command);
 		}
-		
+
 		protected virtual void OnException(ITransport sender, Exception command)
 		{
 			this.exceptionHandler(sender, command);
 		}
-		
+
 		/// <summary>
 		/// Method Oneway
 		/// </summary>
@@ -54,7 +59,7 @@
 		{
 			this.next.Oneway(command);
 		}
-		
+
 		/// <summary>
 		/// Method AsyncRequest
 		/// </summary>
@@ -124,23 +129,32 @@
 		/// <summary>
 		/// Method Dispose
 		/// </summary>
-		public virtual void Dispose()
+		public void Dispose()
 		{
-			this.next.Dispose();
+			Dispose(true);
+			GC.SuppressFinalize(this);
 		}
-		
+
+		protected virtual void Dispose(bool disposing)
+		{
+			if(disposing)
+			{
+				this.next.Dispose();
+			}
+		}
+
 		public CommandHandler Command
 		{
-            get { return commandHandler; }
-            set { this.commandHandler = value; }
-        }
-		
-        public  ExceptionHandler Exception
-		{
-            get { return exceptionHandler; }
-            set { this.exceptionHandler = value; }
-        }
-		
-    }
+			get { return commandHandler; }
+			set { this.commandHandler = value; }
+		}
+
+		public  ExceptionHandler Exception
+		{
+			get { return exceptionHandler; }
+			set { this.exceptionHandler = value; }
+		}
+
+	}
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/WireFormatNegotiator.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/WireFormatNegotiator.cs?rev=693516&r1=693515&r2=693516&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/WireFormatNegotiator.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/WireFormatNegotiator.cs
Tue Sep  9 10:08:09 2008
@@ -24,85 +24,84 @@
 
 namespace Apache.NMS.ActiveMQ.Transport
 {
-	
-    /// <summary>
-    /// A Transport which negotiates the wire format
-    /// </summary>
-    public class WireFormatNegotiator : TransportFilter
-    {
-        private OpenWireFormat wireFormat;
-        private TimeSpan negotiateTimeout = TimeSpan.FromSeconds(15);
-    
-        private AtomicBoolean firstStart=new AtomicBoolean(true);
-        private CountDownLatch readyCountDownLatch = new CountDownLatch(1);
-        private CountDownLatch wireInfoSentDownLatch = new CountDownLatch(1);
-
-        public WireFormatNegotiator(ITransport next, OpenWireFormat wireFormat)
-            : base(next)
-        {
-            this.wireFormat = wireFormat;
-        }
-        
-        public override void Start()
-        {
-            base.Start();
-            if (firstStart.CompareAndSet(true, false))
-            {
-                try
-                {
-                    next.Oneway(wireFormat.PreferedWireFormatInfo);
-                }
-                finally
-                {
-                    wireInfoSentDownLatch.countDown();
-                }
-            }
-        }
-        
-        public override void Dispose()
-        {
-        	base.Dispose();
-            readyCountDownLatch.countDown();
-        }
-
-        public override void Oneway(Command command)
-        {
-            if (!readyCountDownLatch.await(negotiateTimeout))
-                throw new IOException("Wire format negotiation timeout: peer did not send
his wire format.");
-            next.Oneway(command);
-        }
-
-        protected override void OnCommand(ITransport sender, Command command)
-        {
-            if ( command.GetDataStructureType() == WireFormatInfo.ID_WireFormatInfo )
-            {
-                WireFormatInfo info = (WireFormatInfo)command;
-                try
-                {
-                    if (!info.Valid)
-                    {
-                        throw new IOException("Remote wire format magic is invalid");
-                    }
-                    wireInfoSentDownLatch.await(negotiateTimeout);
-                    wireFormat.renegotiateWireFormat(info);
-                }
-                catch (Exception e)
-                {
-                    OnException(this, e);
-                } 
-                finally
-                {
-                    readyCountDownLatch.countDown();
-                }
-            }
-            this.commandHandler(sender, command);
-        }
-
-        protected override void OnException(ITransport sender, Exception command)
-        {
-            readyCountDownLatch.countDown();
-            this.exceptionHandler(sender, command);
-        }
-    }
+	/// <summary>
+	/// A Transport which negotiates the wire format
+	/// </summary>
+	public class WireFormatNegotiator : TransportFilter
+	{
+		private OpenWireFormat wireFormat;
+		private TimeSpan negotiateTimeout = TimeSpan.FromSeconds(15);
+
+		private AtomicBoolean firstStart=new AtomicBoolean(true);
+		private CountDownLatch readyCountDownLatch = new CountDownLatch(1);
+		private CountDownLatch wireInfoSentDownLatch = new CountDownLatch(1);
+
+		public WireFormatNegotiator(ITransport next, OpenWireFormat wireFormat)
+			: base(next)
+		{
+			this.wireFormat = wireFormat;
+		}
+
+		public override void Start()
+		{
+			base.Start();
+			if (firstStart.CompareAndSet(true, false))
+			{
+				try
+				{
+					next.Oneway(wireFormat.PreferedWireFormatInfo);
+				}
+				finally
+				{
+					wireInfoSentDownLatch.countDown();
+				}
+			}
+		}
+
+		protected override void Dispose(bool disposing)
+		{
+			base.Dispose(disposing);
+			readyCountDownLatch.countDown();
+		}
+
+		public override void Oneway(Command command)
+		{
+			if (!readyCountDownLatch.await(negotiateTimeout))
+				throw new IOException("Wire format negotiation timeout: peer did not send his wire format.");
+			next.Oneway(command);
+		}
+
+		protected override void OnCommand(ITransport sender, Command command)
+		{
+			if ( command.GetDataStructureType() == WireFormatInfo.ID_WireFormatInfo )
+			{
+				WireFormatInfo info = (WireFormatInfo)command;
+				try
+				{
+					if (!info.Valid)
+					{
+						throw new IOException("Remote wire format magic is invalid");
+					}
+					wireInfoSentDownLatch.await(negotiateTimeout);
+					wireFormat.renegotiateWireFormat(info);
+				}
+				catch (Exception e)
+				{
+					OnException(this, e);
+				}
+				finally
+				{
+					readyCountDownLatch.countDown();
+				}
+			}
+			this.commandHandler(sender, command);
+		}
+
+		protected override void OnException(ITransport sender, Exception command)
+		{
+			readyCountDownLatch.countDown();
+			this.exceptionHandler(sender, command);
+		}
+	}
 }
 



Mime
View raw message