activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jgo...@apache.org
Subject svn commit: r1312050 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk: ./ src/main/csharp/Connection.cs src/test/csharp/TempDestinationTest.cs
Date Tue, 10 Apr 2012 23:56:05 GMT
Author: jgomes
Date: Tue Apr 10 23:56:04 2012
New Revision: 1312050

URL: http://svn.apache.org/viewvc?rev=1312050&view=rev
Log:
Merged revision(s) 1312026 from activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x:
Make of copy of the temporary destinations that are being cleaned up when closing a Connection
to avoid a race condition of modifying the list of temp destinations while it is being enumerated.
Found that the AdvisoryConsumer monitor was creating a race condition while populating the
internal temp destination list associated with a Connection so that the Connection member
reference was not being set on the temp destination, and subsequently the temp destination
would not be cleaned up when the Connection was closed.
Resolves AMQNET-378.
........

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/   (props changed)
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/TempDestinationTest.cs

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/
------------------------------------------------------------------------------
  Merged /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x:r1312026

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=1312050&r1=1312049&r2=1312050&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs Tue Apr
10 23:56:04 2012
@@ -48,8 +48,8 @@ namespace Apache.NMS.ActiveMQ
 		private bool sendAcksAsync = false;
 		private bool dispatchAsync = true;
 		private int producerWindowSize = 0;
-		private bool messagePrioritySupported=true;
-        private bool watchTopicAdviosires = true;
+		private bool messagePrioritySupported = true;
+		private bool watchTopicAdviosires = true;
 
 		private bool userSpecifiedClientID;
 		private readonly Uri brokerUri;
@@ -62,7 +62,7 @@ namespace Apache.NMS.ActiveMQ
 		private readonly IList sessions = ArrayList.Synchronized(new ArrayList());
 		private readonly IDictionary producers = Hashtable.Synchronized(new Hashtable());
 		private readonly IDictionary dispatchers = Hashtable.Synchronized(new Hashtable());
-        private readonly IDictionary tempDests = Hashtable.Synchronized(new Hashtable());
+		private readonly IDictionary tempDests = Hashtable.Synchronized(new Hashtable());
 		private readonly object connectedLock = new object();
 		private readonly Atomic<bool> connected = new Atomic<bool>(false);
 		private readonly Atomic<bool> closed = new Atomic<bool>(false);
@@ -79,11 +79,11 @@ namespace Apache.NMS.ActiveMQ
 		private PrefetchPolicy prefetchPolicy = new PrefetchPolicy();
 		private ICompressionPolicy compressionPolicy = new CompressionPolicy();
 		private readonly IdGenerator clientIdGenerator;
-        private int consumerIdCounter = 0;
+		private int consumerIdCounter = 0;
 		private volatile CountDownLatch transportInterruptionProcessingComplete;
 		private readonly MessageTransformation messageTransformation;
 		private readonly ThreadPoolExecutor executor = new ThreadPoolExecutor();
-        private AdvisoryConsumer advisoryConsumer = null;
+		private AdvisoryConsumer advisoryConsumer = null;
 
 		public Connection(Uri connectionUri, ITransport transport, IdGenerator clientIdGenerator)
 		{
@@ -326,11 +326,11 @@ namespace Apache.NMS.ActiveMQ
 			set { this.dispatchAsync = value; }
 		}
 
-        public bool WatchTopicAdvisories
-        {
-            get { return this.watchTopicAdviosires; }
-            set { this.watchTopicAdviosires = value; }
-        }
+		public bool WatchTopicAdvisories
+		{
+			get { return this.watchTopicAdviosires; }
+			set { this.watchTopicAdviosires = value; }
+		}
 
 		public string ClientId
 		{
@@ -547,11 +547,11 @@ namespace Apache.NMS.ActiveMQ
 					Tracer.Info("Connection.Close(): Closing Connection Now.");
 					this.closing.Value = true;
 
-                    if(this.advisoryConsumer != null)
-                    {
-                        this.advisoryConsumer.Dispose();
-                        this.advisoryConsumer = null;
-                    }
+					if(this.advisoryConsumer != null)
+					{
+						this.advisoryConsumer.Dispose();
+						this.advisoryConsumer = null;
+					}
 
 					lock(sessions.SyncRoot)
 					{
@@ -562,10 +562,18 @@ namespace Apache.NMS.ActiveMQ
 					}
 					sessions.Clear();
 
-                    foreach(ActiveMQTempDestination dest in this.tempDests.Values)
-                    {
-                        dest.Delete();
-                    }
+					if(this.tempDests.Count > 0)
+					{
+						// Make a copy of the destinations to delete, because the act of deleting
+						// them will modify the collection.
+						ActiveMQTempDestination[] tempDestsToDelete = new ActiveMQTempDestination[this.tempDests.Count];
+
+						this.tempDests.Values.CopyTo(tempDestsToDelete, 0);
+						foreach(ActiveMQTempDestination dest in tempDestsToDelete)
+						{
+							dest.Delete();
+						}
+					}
 
 					// Connected is true only when we've successfully sent our ConnectionInfo
 					// to the broker, so if we haven't announced ourselves there's no need to
@@ -687,7 +695,7 @@ namespace Apache.NMS.ActiveMQ
 				Response response = transport.Request(command, requestTimeout);
 				if(response is ExceptionResponse)
 				{
-					ExceptionResponse exceptionResponse = (ExceptionResponse) response;
+					ExceptionResponse exceptionResponse = (ExceptionResponse)response;
                     Exception exception = CreateExceptionFromResponse(exceptionResponse);
 					throw exception;
 				}
@@ -790,12 +798,12 @@ namespace Apache.NMS.ActiveMQ
 										if(!(response is ExceptionResponse))
 										{
 											connected.Value = true;
-                                            if(this.watchTopicAdviosires)
-                                            {
-                                                ConsumerId id = new ConsumerId(
-                                                    new SessionId(info.ConnectionId, -1),
-                                                    Interlocked.Increment(ref this.consumerIdCounter));
-                                                this.advisoryConsumer = new AdvisoryConsumer(this,
id);
+											if(this.watchTopicAdviosires)
+											{
+												ConsumerId id = new ConsumerId(
+													new SessionId(info.ConnectionId, -1),
+													Interlocked.Increment(ref this.consumerIdCounter));
+												this.advisoryConsumer = new AdvisoryConsumer(this, id);
                                             }
 										}
                                         else
@@ -852,19 +860,19 @@ namespace Apache.NMS.ActiveMQ
 			if(command.IsMessageDispatch)
 			{
 				WaitForTransportInterruptionProcessingToComplete();
-				DispatchMessage((MessageDispatch) command);
+				DispatchMessage((MessageDispatch)command);
 			}
 			else if(command.IsKeepAliveInfo)
 			{
-				OnKeepAliveCommand(commandTransport, (KeepAliveInfo) command);
+				OnKeepAliveCommand(commandTransport, (KeepAliveInfo)command);
 			}
 			else if(command.IsWireFormatInfo)
 			{
-				this.brokerWireFormatInfo = (WireFormatInfo) command;
+				this.brokerWireFormatInfo = (WireFormatInfo)command;
 			}
 			else if(command.IsBrokerInfo)
 			{
-				this.brokerInfo = (BrokerInfo) command;
+				this.brokerInfo = (BrokerInfo)command;
 				this.brokerInfoReceived.countDown();
 			}
 			else if(command.IsShutdownInfo)
@@ -876,7 +884,7 @@ namespace Apache.NMS.ActiveMQ
 			}
 			else if(command.IsProducerAck)
 			{
-				ProducerAck ack = (ProducerAck) command as ProducerAck;
+				ProducerAck ack = (ProducerAck)command as ProducerAck;
 				if(ack.ProducerId != null)
 				{
 					MessageProducer producer = producers[ack.ProducerId] as MessageProducer;
@@ -895,7 +903,7 @@ namespace Apache.NMS.ActiveMQ
 			{
 				if(!closing.Value && !closed.Value)
 				{
-					ConnectionError connectionError = (ConnectionError) command;
+					ConnectionError connectionError = (ConnectionError)command;
 					BrokerError brokerError = connectionError.Exception;
 					string message = "Broker connection error.";
 					string cause = "";
@@ -925,7 +933,7 @@ namespace Apache.NMS.ActiveMQ
 			{
 				if(dispatchers.Contains(dispatch.ConsumerId))
 				{
-					IDispatcher dispatcher = (IDispatcher) dispatchers[dispatch.ConsumerId];
+					IDispatcher dispatcher = (IDispatcher)dispatchers[dispatch.ConsumerId];
 
 					// Can be null when a consumer has sent a MessagePull and there was
 					// no available message at the broker to dispatch or when signalled
@@ -978,7 +986,7 @@ namespace Apache.NMS.ActiveMQ
 					{
 						error = NMSExceptionSupport.Create(error);
 					}
-					NMSException e = (NMSException) error;
+					NMSException e = (NMSException)error;
 
 					// Called in another thread so that processing can continue
 					// here, ensures no lock contention.
@@ -1066,12 +1074,12 @@ namespace Apache.NMS.ActiveMQ
 		{
 			Tracer.Debug("Connection: Transport has been Interrupted.");
 
-            // Ensure that if there's an advisory consumer we don't add it to the
-            // set of consumers that need interruption processing.
+			// Ensure that if there's an advisory consumer we don't add it to the
+			// set of consumers that need interruption processing.
 			this.transportInterruptionProcessingComplete =
-                new CountDownLatch(dispatchers.Count - (this.advisoryConsumer != null ? 1
: 0));
+				new CountDownLatch(dispatchers.Count - (this.advisoryConsumer != null ? 1 : 0));
 
-            if(Tracer.IsDebugEnabled)
+			if(Tracer.IsDebugEnabled)
 			{
 				Tracer.Debug("transport interrupted, dispatchers: " + dispatchers.Count);
 			}
@@ -1172,27 +1180,27 @@ namespace Apache.NMS.ActiveMQ
 
 			this.SyncRequest(command);
 
+			destination = this.AddTempDestination(destination);
 			destination.Connection = this;
-            this.AddTempDestination(destination);
 
 			return destination;
 		}
 
 		public void DeleteTemporaryDestination(IDestination destination)
 		{
-            CheckClosedOrFailed();
+			CheckClosedOrFailed();
 
-            ActiveMQTempDestination temp = destination as ActiveMQTempDestination;
+			ActiveMQTempDestination temp = destination as ActiveMQTempDestination;
 
-            foreach(Session session in this.sessions)
-            {
-                if(session.IsInUse(temp))
-                {
-                    throw new NMSException("A consumer is consuming from the temporary destination");
-                }
-            }
+			foreach(Session session in this.sessions)
+			{
+				if(session.IsInUse(temp))
+				{
+					throw new NMSException("A consumer is consuming from the temporary destination");
+				}
+			}
 
-            this.tempDests.Remove(destination as ActiveMQTempDestination);
+			this.tempDests.Remove(destination as ActiveMQTempDestination);
 			this.DeleteDestination(destination);
 		}
 
@@ -1201,7 +1209,7 @@ namespace Apache.NMS.ActiveMQ
 			DestinationInfo command = new DestinationInfo();
 			command.ConnectionId = this.ConnectionId;
 			command.OperationType = DestinationInfo.REMOVE_OPERATION_TYPE; // 1 is remove
-			command.Destination = (ActiveMQDestination) destination;
+			command.Destination = (ActiveMQDestination)destination;
 
 			this.Oneway(command);
 		}
@@ -1278,49 +1286,57 @@ namespace Apache.NMS.ActiveMQ
 			}
 		}
 
-        internal void AddTempDestination(ActiveMQTempDestination dest)
-        {
-            // .NET lacks a putIfAbsent operation for Maps.
-            lock(tempDests.SyncRoot)
-            {
-                if(!this.tempDests.Contains(dest))
-                {
-                    this.tempDests.Add(dest, dest);
-                }
-            }
-        }
+		internal ActiveMQTempDestination AddTempDestination(ActiveMQTempDestination dest)
+		{
+			ActiveMQTempDestination addedDest = dest;
 
-        internal void RemoveTempDestination(ActiveMQTempDestination dest)
-        {
-            this.tempDests.Remove(dest);
-        }
+			// .NET lacks a putIfAbsent operation for Maps.
+			lock(tempDests.SyncRoot)
+			{
+				if(!this.tempDests.Contains(dest))
+				{
+					this.tempDests.Add(dest, dest);
+				}
+				else
+				{
+					addedDest = this.tempDests[dest] as ActiveMQTempDestination;
+				}
+			}
 
-        internal bool IsTempDestinationActive(ActiveMQTempDestination dest)
-        {
-            if(this.advisoryConsumer == null)
-            {
-                return true;
-            }
+			return addedDest;
+		}
 
-            return this.tempDests.Contains(dest);
-        }
+		internal void RemoveTempDestination(ActiveMQTempDestination dest)
+		{
+			this.tempDests.Remove(dest);
+		}
 
-        protected void CheckClosedOrFailed()
-        {
-            CheckClosed();
-            if (transportFailed.Value)
-            {
-                throw new ConnectionFailedException(firstFailureError.Message);
-            }
-        }
+		internal bool IsTempDestinationActive(ActiveMQTempDestination dest)
+		{
+			if(this.advisoryConsumer == null)
+			{
+				return true;
+			}
 
-        protected void CheckClosed()
-        {
-            if(closed.Value)
-            {
-                throw new ConnectionClosedException();
-            }
-        }
+			return this.tempDests.Contains(dest);
+		}
+
+		protected void CheckClosedOrFailed()
+		{
+			CheckClosed();
+			if(transportFailed.Value)
+			{
+				throw new ConnectionFailedException(firstFailureError.Message);
+			}
+		}
+
+		protected void CheckClosed()
+		{
+			if(closed.Value)
+			{
+				throw new ConnectionClosedException();
+			}
+		}
 
         private NMSException CreateExceptionFromResponse(ExceptionResponse exceptionResponse)
         {

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/TempDestinationTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/TempDestinationTest.cs?rev=1312050&r1=1312049&r2=1312050&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/TempDestinationTest.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/TempDestinationTest.cs
Tue Apr 10 23:56:04 2012
@@ -29,34 +29,34 @@ using NUnit.Framework;
 
 namespace Apache.NMS.ActiveMQ.Test
 {
-    [TestFixture]
-    public class TempDestinationTest : NMSTestSupport
-    {
-        private readonly IList connections = ArrayList.Synchronized(new ArrayList());
-
-        [SetUp]
-        public override void SetUp()
-        {
-            base.SetUp();
-        }
-
-        [TearDown]
-        public override void TearDown()
-        {
-            foreach(Connection connection in connections)
-            {
-                try
-                {
-                    connection.Close();
-                }
-                catch
-                {
-                }
-            }
-
-            connections.Clear();
-            base.TearDown();
-        }
+	[TestFixture]
+	public class TempDestinationTest : NMSTestSupport
+	{
+		private readonly IList connections = ArrayList.Synchronized(new ArrayList());
+
+		[SetUp]
+		public override void SetUp()
+		{
+			base.SetUp();
+		}
+
+		[TearDown]
+		public override void TearDown()
+		{
+			foreach(Connection connection in connections)
+			{
+				try
+				{
+					connection.Close();
+				}
+				catch
+				{
+				}
+			}
+
+			connections.Clear();
+			base.TearDown();
+		}
 
 		private Connection GetNewConnection()
 		{
@@ -66,212 +66,212 @@ namespace Apache.NMS.ActiveMQ.Test
 		}
 
 		/// <summary>
-        /// Make sure Temp destination can only be consumed by local connection
-        /// </summary>
-        [Test]
-        public void TestTempDestOnlyConsumedByLocalConn()
-        {
+		/// Make sure Temp destination can only be consumed by local connection
+		/// </summary>
+		[Test]
+		public void TestTempDestOnlyConsumedByLocalConn()
+		{
 			Connection connection = GetNewConnection();
 			connection.Start();
 
-            ISession tempSession = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
-            ITemporaryQueue queue = tempSession.CreateTemporaryQueue();
-            IMessageProducer producer = tempSession.CreateProducer(queue);
-            producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
-            ITextMessage message = tempSession.CreateTextMessage("First");
-            producer.Send(message);
+			ISession tempSession = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+			ITemporaryQueue queue = tempSession.CreateTemporaryQueue();
+			IMessageProducer producer = tempSession.CreateProducer(queue);
+			producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
+			ITextMessage message = tempSession.CreateTextMessage("First");
+			producer.Send(message);
 
-            // temp destination should not be consume when using another connection
+			// temp destination should not be consume when using another connection
 			Connection otherConnection = GetNewConnection();
-            ISession otherSession = otherConnection.CreateSession(AcknowledgementMode.AutoAcknowledge);
-            ITemporaryQueue otherQueue = otherSession.CreateTemporaryQueue();
-            IMessageConsumer consumer = otherSession.CreateConsumer(otherQueue);
-            IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(3000));
-            Assert.IsNull(msg);
-
-            // should throw InvalidDestinationException when consuming a temp
-            // destination from another connection
-            try
-            {
-                consumer = otherSession.CreateConsumer(queue);
-                Assert.Fail("Send should fail since temp destination should be used from
another connection");
-            }
-            catch(InvalidDestinationException)
-            {
-                Assert.IsTrue(true, "failed to throw an exception");
-            }
-
-            // should be able to consume temp destination from the same connection
-            consumer = tempSession.CreateConsumer(queue);
-            msg = consumer.Receive(TimeSpan.FromMilliseconds(3000));
-            Assert.NotNull(msg);
-        }
-
-        /// <summary>
-        /// Make sure that a temp queue does not drop message if there is an active consumers.
-        /// </summary>
-        [Test]
-        public void TestTempQueueHoldsMessagesWithConsumers()
-        {
+			ISession otherSession = otherConnection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+			ITemporaryQueue otherQueue = otherSession.CreateTemporaryQueue();
+			IMessageConsumer consumer = otherSession.CreateConsumer(otherQueue);
+			IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+			Assert.IsNull(msg);
+
+			// should throw InvalidDestinationException when consuming a temp
+			// destination from another connection
+			try
+			{
+				consumer = otherSession.CreateConsumer(queue);
+				Assert.Fail("Send should fail since temp destination should be used from another connection");
+			}
+			catch(InvalidDestinationException)
+			{
+				Assert.IsTrue(true, "failed to throw an exception");
+			}
+
+			// should be able to consume temp destination from the same connection
+			consumer = tempSession.CreateConsumer(queue);
+			msg = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+			Assert.NotNull(msg);
+		}
+
+		/// <summary>
+		/// Make sure that a temp queue does not drop message if there is an active consumers.
+		/// </summary>
+		[Test]
+		public void TestTempQueueHoldsMessagesWithConsumers()
+		{
 			Connection connection = GetNewConnection();
 			ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
-            IQueue queue = session.CreateTemporaryQueue();
-            IMessageConsumer consumer = session.CreateConsumer(queue);
-            connection.Start();
-
-            IMessageProducer producer = session.CreateProducer(queue);
-            producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
-            ITextMessage message = session.CreateTextMessage("Hello");
-            producer.Send(message);
-
-            IMessage message2 = consumer.Receive(TimeSpan.FromMilliseconds(1000));
-            Assert.IsNotNull(message2);
-            Assert.IsTrue(message2 is ITextMessage, "Expected message to be a TextMessage");
-            Assert.IsTrue(((ITextMessage)message2).Text.Equals(message.Text),
-                          "Expected message to be a '" + message.Text + "'");
-        }
-
-        /// <summary>
-        /// Make sure that a temp queue does not drop message if there are no active consumers.
-        /// </summary>
-        [Test]
-        public void TestTempQueueHoldsMessagesWithoutConsumers()
-        {
+			IQueue queue = session.CreateTemporaryQueue();
+			IMessageConsumer consumer = session.CreateConsumer(queue);
+			connection.Start();
+
+			IMessageProducer producer = session.CreateProducer(queue);
+			producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
+			ITextMessage message = session.CreateTextMessage("Hello");
+			producer.Send(message);
+
+			IMessage message2 = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+			Assert.IsNotNull(message2);
+			Assert.IsTrue(message2 is ITextMessage, "Expected message to be a TextMessage");
+			Assert.IsTrue(((ITextMessage)message2).Text.Equals(message.Text),
+						  "Expected message to be a '" + message.Text + "'");
+		}
+
+		/// <summary>
+		/// Make sure that a temp queue does not drop message if there are no active consumers.
+		/// </summary>
+		[Test]
+		public void TestTempQueueHoldsMessagesWithoutConsumers()
+		{
 			Connection connection = GetNewConnection();
 			ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
-            IQueue queue = session.CreateTemporaryQueue();
-            IMessageProducer producer = session.CreateProducer(queue);
-            producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
-            ITextMessage message = session.CreateTextMessage("Hello");
-            producer.Send(message);
-    
-            connection.Start();
-            IMessageConsumer consumer = session.CreateConsumer(queue);
-            IMessage message2 = consumer.Receive(TimeSpan.FromMilliseconds(3000));
-            Assert.IsNotNull(message2);
-            Assert.IsTrue(message2 is ITextMessage, "Expected message to be a TextMessage");
-            Assert.IsTrue(((ITextMessage)message2).Text.Equals(message.Text),
-                          "Expected message to be a '" + message.Text + "'");
-    
-        }
-    
-        /// <summary>
-        /// Test temp queue works under load
-        /// </summary>
-        [Test]
-        public void TestTmpQueueWorksUnderLoad()
-        {
-            int count = 500;
-            int dataSize = 1024;
-    
-            ArrayList list = new ArrayList(count);
+			IQueue queue = session.CreateTemporaryQueue();
+			IMessageProducer producer = session.CreateProducer(queue);
+			producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
+			ITextMessage message = session.CreateTextMessage("Hello");
+			producer.Send(message);
+
+			connection.Start();
+			IMessageConsumer consumer = session.CreateConsumer(queue);
+			IMessage message2 = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+			Assert.IsNotNull(message2);
+			Assert.IsTrue(message2 is ITextMessage, "Expected message to be a TextMessage");
+			Assert.IsTrue(((ITextMessage)message2).Text.Equals(message.Text),
+						  "Expected message to be a '" + message.Text + "'");
+
+		}
+
+		/// <summary>
+		/// Test temp queue works under load
+		/// </summary>
+		[Test]
+		public void TestTmpQueueWorksUnderLoad()
+		{
+			int count = 500;
+			int dataSize = 1024;
+
+			ArrayList list = new ArrayList(count);
 			Connection connection = GetNewConnection();
 			ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
-            IQueue queue = session.CreateTemporaryQueue();
-            IMessageProducer producer = session.CreateProducer(queue);
-            producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
-
-            byte[] data = new byte[dataSize];
-            for (int i = 0; i < count; i++)
-            {
-                IBytesMessage message = session.CreateBytesMessage();
-                message.WriteBytes(data);
-                message.Properties.SetInt("c", i);
-                producer.Send(message);
-                list.Add(message);
-            }
-
-            connection.Start();
-            IMessageConsumer consumer = session.CreateConsumer(queue);
-            for (int i = 0; i < count; i++)
-            {
-                IMessage message2 = consumer.Receive(TimeSpan.FromMilliseconds(2000));
-                Assert.IsTrue(message2 != null);
-                Assert.AreEqual(i, message2.Properties.GetInt("c"));
-                Assert.IsTrue(message2.Equals(list[i]));
-            }
-        }
-    
-        /// <summary>
-        /// Make sure you cannot publish to a temp destination that does not exist anymore.
-        /// </summary>
-        [Test]
-        public void TestPublishFailsForClosedConnection()
-        {
+			IQueue queue = session.CreateTemporaryQueue();
+			IMessageProducer producer = session.CreateProducer(queue);
+			producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
+
+			byte[] data = new byte[dataSize];
+			for(int i = 0; i < count; i++)
+			{
+				IBytesMessage message = session.CreateBytesMessage();
+				message.WriteBytes(data);
+				message.Properties.SetInt("c", i);
+				producer.Send(message);
+				list.Add(message);
+			}
+
+			connection.Start();
+			IMessageConsumer consumer = session.CreateConsumer(queue);
+			for(int i = 0; i < count; i++)
+			{
+				IMessage message2 = consumer.Receive(TimeSpan.FromMilliseconds(2000));
+				Assert.IsTrue(message2 != null);
+				Assert.AreEqual(i, message2.Properties.GetInt("c"));
+				Assert.IsTrue(message2.Equals(list[i]));
+			}
+		}
+
+		/// <summary>
+		/// Make sure you cannot publish to a temp destination that does not exist anymore.
+		/// </summary>
+		[Test]
+		public void TestPublishFailsForClosedConnection()
+		{
 			Connection connection = GetNewConnection();
 			Connection tempConnection = GetNewConnection();
-            ISession tempSession = tempConnection.CreateSession(AcknowledgementMode.AutoAcknowledge);
-            ITemporaryQueue queue = tempSession.CreateTemporaryQueue();
+			ISession tempSession = tempConnection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+			ITemporaryQueue queue = tempSession.CreateTemporaryQueue();
 
-            ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
-            connection.Start();
+			ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+			connection.Start();
 
-            // This message delivery should work since the temp connection is still
-            // open.
-            IMessageProducer producer = session.CreateProducer(queue);
-            producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
-            ITextMessage message = session.CreateTextMessage("First");
-            producer.Send(message);
-            Thread.Sleep(1000);
-
-            // Closing the connection should destroy the temp queue that was
-            // created.
-            tempConnection.Close();
-            Thread.Sleep(5000); // Wait a little bit to let the delete take effect.
-    
-            // This message delivery NOT should work since the temp connection is
-            // now closed.
-            try
-            {
-                message = session.CreateTextMessage("Hello");
-                producer.Send(message);
-                Assert.Fail("Send should fail since temp destination should not exist anymore.");
-            }
-            catch(NMSException e)
-            {
-                Tracer.Debug("Test threw expected exception: " + e.Message);
-            }
-        }
-    
-        /// <summary>
-        /// Make sure you cannot publish to a temp destination that does not exist anymore.
-        /// </summary>
-        [Test]
-        public void TestPublishFailsForDestoryedTempDestination()
-        {
+			// This message delivery should work since the temp connection is still
+			// open.
+			IMessageProducer producer = session.CreateProducer(queue);
+			producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
+			ITextMessage message = session.CreateTextMessage("First");
+			producer.Send(message);
+			Thread.Sleep(1000);
+
+			// Closing the connection should destroy the temp queue that was
+			// created.
+			tempConnection.Close();
+			Thread.Sleep(5000); // Wait a little bit to let the delete take effect.
+
+			// This message delivery NOT should work since the temp connection is
+			// now closed.
+			try
+			{
+				message = session.CreateTextMessage("Hello");
+				producer.Send(message);
+				Assert.Fail("Send should fail since temp destination should not exist anymore.");
+			}
+			catch(NMSException e)
+			{
+				Tracer.Debug("Test threw expected exception: " + e.Message);
+			}
+		}
+
+		/// <summary>
+		/// Make sure you cannot publish to a temp destination that does not exist anymore.
+		/// </summary>
+		[Test]
+		public void TestPublishFailsForDestroyedTempDestination()
+		{
 			Connection connection = GetNewConnection();
 			Connection tempConnection = GetNewConnection();
-            ISession tempSession = tempConnection.CreateSession(AcknowledgementMode.AutoAcknowledge);
-            ITemporaryQueue queue = tempSession.CreateTemporaryQueue();
-    
-            ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
-            connection.Start();
-
-            // This message delivery should work since the temp connection is still
-            // open.
-            IMessageProducer producer = session.CreateProducer(queue);
-            producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
-            ITextMessage message = session.CreateTextMessage("First");
-            producer.Send(message);
-            Thread.Sleep(1000);
-
-            // deleting the Queue will cause sends to fail
-            queue.Delete();
-            Thread.Sleep(5000); // Wait a little bit to let the delete take effect.
-    
-            // This message delivery NOT should work since the temp connection is
-            // now closed.
-            try
-            {
-                message = session.CreateTextMessage("Hello");
-                producer.Send(message);
-                Assert.Fail("Send should fail since temp destination should not exist anymore.");
-            }
-            catch(NMSException e)
-            {
-                Tracer.Debug("Test threw expected exception: " + e.Message);
-            }
-        }
+			ISession tempSession = tempConnection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+			ITemporaryQueue queue = tempSession.CreateTemporaryQueue();
+
+			ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+			connection.Start();
+
+			// This message delivery should work since the temp connection is still
+			// open.
+			IMessageProducer producer = session.CreateProducer(queue);
+			producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
+			ITextMessage message = session.CreateTextMessage("First");
+			producer.Send(message);
+			Thread.Sleep(1000);
+
+			// deleting the Queue will cause sends to fail
+			queue.Delete();
+			Thread.Sleep(5000); // Wait a little bit to let the delete take effect.
+
+			// This message delivery NOT should work since the temp connection is
+			// now closed.
+			try
+			{
+				message = session.CreateTextMessage("Hello");
+				producer.Send(message);
+				Assert.Fail("Send should fail since temp destination should not exist anymore.");
+			}
+			catch(NMSException e)
+			{
+				Tracer.Debug("Test threw expected exception: " + e.Message);
+			}
+		}
 
 		/// <summary>
 		/// Make sure consumers work after a publisher fails to publish to deleted temp destination.
@@ -311,7 +311,7 @@ namespace Apache.NMS.ActiveMQ.Test
 
 				connections.Remove(producerConnection);
 				producerConnection.Close();
-				//Thread.Sleep(2000); // Wait a little bit to let the delete take effect.
+				Thread.Sleep(1000); // Wait a little bit to let the delete take effect.
 
 				// This message delivery NOT should work since the temp destination was removed by closing
the connection.
 				try
@@ -328,98 +328,117 @@ namespace Apache.NMS.ActiveMQ.Test
 		}
 
 		/// <summary>
-        /// Test you can't delete a Destination with Active Subscribers
-        /// </summary>
-        [Test]
-        public void TestDeleteDestinationWithSubscribersFails()
-        {
+		/// Test you can't delete a Destination with Active Subscribers
+		/// </summary>
+		[Test]
+		public void TestDeleteDestinationWithSubscribersFails()
+		{
 			Connection connection = GetNewConnection();
-            ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
-            ITemporaryQueue queue = session.CreateTemporaryQueue();
+			ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+			ITemporaryQueue queue = session.CreateTemporaryQueue();
+
+			connection.Start();
+
+			session.CreateConsumer(queue);
 
-            connection.Start();
+			try
+			{
+				queue.Delete();
+				Assert.Fail("Should fail as Subscribers are active");
+			}
+			catch(NMSException)
+			{
+				Assert.IsTrue(true, "failed to throw an exception");
+			}
+		}
 
-            session.CreateConsumer(queue);
+		/// <summary>
+		/// Test clean up of multiple temp destinations
+		/// </summary>
+		[Test]
+		public void TestCloseConnectionWithTempQueues()
+		{
+			List<ITemporaryQueue> listTempQueues = new List<ITemporaryQueue>();
+			IConnection connection = CreateConnection();
+			ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
 
-            try
-            {
-                queue.Delete();
-                Assert.Fail("Should fail as Subscribers are active");
-            }
-            catch(NMSException)
-            {
-                Assert.IsTrue(true, "failed to throw an exception");
-            }
-        }
-
-        [Test]
-        public void TestConnectionCanPurgeTempDestinations()
-        {
-            Connection connection = CreateConnection() as Connection;
-            connections.Add(connection);
-            ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
-            IMessageConsumer advisoryConsumer = session.CreateConsumer(AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC);
-            advisoryConsumer.Listener += OnAdvisoryMessage;
-
-            connection.Start();
-
-            for(int i = 0; i < 10; ++i)
-            {
-                ITemporaryTopic tempTopic = session.CreateTemporaryTopic();
-                Tracer.Debug("Created TempDestination: " + tempTopic);
-            }
-
-            // Create one from an alternate connection, it shouldn't get purged
-            // so we should have one less removed than added entries.
-            Connection connection2 = CreateConnection() as Connection;
-            ISession session2 = connection2.CreateSession(AcknowledgementMode.AutoAcknowledge);
-            ITemporaryTopic tempTopic2 = session2.CreateTemporaryTopic();
-
-            Thread.Sleep(4000);
-            Assert.IsTrue(tempDestsAdded.Count == 11);
-
-            connection.PurgeTempDestinations();
-
-            Thread.Sleep(4000);
-            Assert.IsTrue(tempDestsRemoved.Count == 10);
-        }
-
-        private readonly IList tempDestsAdded = ArrayList.Synchronized(new ArrayList());
-        private readonly IList tempDestsRemoved = ArrayList.Synchronized(new ArrayList());
-
-        private void OnAdvisoryMessage(IMessage msg)
-        {
-            Message message = msg as Message;
-            DestinationInfo destInfo = message.DataStructure as DestinationInfo;
-
-            if(destInfo != null)
-            {
-                ActiveMQDestination dest = destInfo.Destination;
-                if(!dest.IsTemporary)
-                {
-                    return;
-                }
-    
-                ActiveMQTempDestination tempDest = dest as ActiveMQTempDestination;
-                if(destInfo.OperationType == DestinationInfo.ADD_OPERATION_TYPE)
-                {
-                    if(Tracer.IsDebugEnabled)
-                    {
-                        Tracer.Debug("Connection adding: " + tempDest);
-                    }
-                    this.tempDestsAdded.Add(tempDest);
-                }
-                else if(destInfo.OperationType == DestinationInfo.REMOVE_OPERATION_TYPE)
-                {
-                    if(Tracer.IsDebugEnabled)
-                    {
-                        Tracer.Debug("Connection removing: " + tempDest);
-                    }
-                    this.tempDestsRemoved.Add(tempDest);
-                }
-            }
-        }
+			connection.Start();
 
-    }
+			for(int index = 0; index < 25; index++)
+			{
+				listTempQueues.Add(session.CreateTemporaryQueue());
+			}
+
+			connection.Close();
+		}
+
+		[Test]
+		public void TestConnectionCanPurgeTempDestinations()
+		{
+			Connection connection = CreateConnection() as Connection;
+			connections.Add(connection);
+			ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+			IMessageConsumer advisoryConsumer = session.CreateConsumer(AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC);
+			advisoryConsumer.Listener += OnAdvisoryMessage;
+
+			connection.Start();
+
+			for(int i = 0; i < 10; ++i)
+			{
+				ITemporaryTopic tempTopic = session.CreateTemporaryTopic();
+				Tracer.Debug("Created TempDestination: " + tempTopic);
+			}
+
+			// Create one from an alternate connection, it shouldn't get purged
+			// so we should have one less removed than added entries.
+			Connection connection2 = CreateConnection() as Connection;
+			ISession session2 = connection2.CreateSession(AcknowledgementMode.AutoAcknowledge);
+			ITemporaryTopic tempTopic2 = session2.CreateTemporaryTopic();
+
+			Thread.Sleep(4000);
+			Assert.IsTrue(tempDestsAdded.Count == 11);
+
+			connection.PurgeTempDestinations();
+
+			Thread.Sleep(4000);
+			Assert.IsTrue(tempDestsRemoved.Count == 10);
+		}
+
+		private readonly IList tempDestsAdded = ArrayList.Synchronized(new ArrayList());
+		private readonly IList tempDestsRemoved = ArrayList.Synchronized(new ArrayList());
+
+		private void OnAdvisoryMessage(IMessage msg)
+		{
+			Message message = msg as Message;
+			DestinationInfo destInfo = message.DataStructure as DestinationInfo;
+
+			if(destInfo != null)
+			{
+				ActiveMQDestination dest = destInfo.Destination;
+				if(!dest.IsTemporary)
+				{
+					return;
+				}
+
+				ActiveMQTempDestination tempDest = dest as ActiveMQTempDestination;
+				if(destInfo.OperationType == DestinationInfo.ADD_OPERATION_TYPE)
+				{
+					if(Tracer.IsDebugEnabled)
+					{
+						Tracer.Debug("Connection adding: " + tempDest);
+					}
+					this.tempDestsAdded.Add(tempDest);
+				}
+				else if(destInfo.OperationType == DestinationInfo.REMOVE_OPERATION_TYPE)
+				{
+					if(Tracer.IsDebugEnabled)
+					{
+						Tracer.Debug("Connection removing: " + tempDest);
+					}
+					this.tempDestsRemoved.Add(tempDest);
+				}
+			}
+		}
+	}
 }
 



Mime
View raw message