activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jgo...@apache.org
Subject svn commit: r1576997 - in /activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src: main/csharp/ test/csharp/
Date Wed, 12 Mar 2014 23:19:07 GMT
Author: jgomes
Date: Wed Mar 12 23:19:06 2014
New Revision: 1576997

URL: http://svn.apache.org/r1576997
Log:
Add IDisposable interface to IDestination.
Fixes [AMQNET-473]. (See https://issues.apache.org/jira/browse/AMQNET-473)
Complete provider implementation for ZeroMQ.
Fixes [AMQNET-333]. (See https://issues.apache.org/jira/browse/AMQNET-333)

Modified:
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Queue.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryQueue.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryTopic.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Topic.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/MultiProducersMultiConsumers.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs?rev=1576997&r1=1576996&r2=1576997&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs Wed Mar 12
23:19:06 2014
@@ -34,6 +34,8 @@ namespace Apache.NMS.ZMQ
 		protected ZmqSocket consumerEndpoint = null;
 		protected string destinationName;
 
+		private bool disposed = false;
+
 		/// <summary>
 		/// Construct the Destination with a defined physical name.
 		/// </summary>
@@ -46,16 +48,58 @@ namespace Apache.NMS.ZMQ
 
 		~Destination()
 		{
-			// TODO: Implement IDisposable pattern
+			Dispose(false);
+		}
+
+		public void Dispose()
+		{
+			Dispose(true);
+			GC.SuppressFinalize(this);
+		}
+
+		private void Dispose(bool disposing)
+		{
+			if(disposed)
+			{
+				return;
+			}
+
+			if(disposing)
+			{
+				try
+				{
+					OnDispose();
+				}
+				catch(Exception ex)
+				{
+					Tracer.ErrorFormat("Exception disposing Destination {0}: {1}", this.Name, ex.Message);
+				}
+			}
+
+			disposed = true;
+		}
+
+		/// <summary>
+		/// Child classes can override this method to perform clean-up logic.
+		/// </summary>
+		protected virtual void OnDispose()
+		{
 			if(null != this.producerEndpoint)
 			{
-				this.session.Connection.ReleaseProducer(this.producerEndpoint);
+				if(null != this.session
+					&& null != this.session.Connection)
+				{
+					this.session.Connection.ReleaseProducer(this.producerEndpoint);
+				}
+
+				this.producerEndpoint.Dispose();
 				this.producerEndpoint = null;
 			}
 
 			if(null != this.consumerEndpoint)
 			{
 				this.session.Connection.ReleaseConsumer(this.consumerEndpoint);
+				this.consumerEndpoint.Dispose();
 				this.consumerEndpoint = null;
 			}
 		}
@@ -69,8 +113,8 @@ namespace Apache.NMS.ZMQ
 		{
 			get
 			{
-				return DestinationType == DestinationType.Topic
-					|| DestinationType == DestinationType.TemporaryTopic;
+				return this.DestinationType == DestinationType.Topic
+					|| this.DestinationType == DestinationType.TemporaryTopic;
 			}
 		}
 
@@ -78,8 +122,8 @@ namespace Apache.NMS.ZMQ
 		{
 			get
 			{
-				return DestinationType == DestinationType.Queue
-					|| DestinationType == DestinationType.TemporaryQueue;
+				return this.DestinationType == DestinationType.Queue
+					|| this.DestinationType == DestinationType.TemporaryQueue;
 			}
 		}
 
@@ -87,34 +131,8 @@ namespace Apache.NMS.ZMQ
 		{
 			get
 			{
-				return DestinationType == DestinationType.TemporaryQueue
-					|| DestinationType == DestinationType.TemporaryTopic;
-			}
-		}
-
-		/// <summary>
-		/// </summary>
-		/// <returns>string representation of this instance</returns>
-		public override string ToString()
-		{
-			return MakeUriString(this.destinationName);
-		}
-
-		private string MakeUriString(string destName)
-		{
-			switch(DestinationType)
-			{
-			case DestinationType.Topic:
-				return "topic://" + destName;
-
-			case DestinationType.TemporaryTopic:
-				return "temp-topic://" + destName;
-
-			case DestinationType.TemporaryQueue:
-				return "temp-queue://" + destName;
-
-			default:
-				return "queue://" + destName;
+				return this.DestinationType == DestinationType.TemporaryQueue
+					|| this.DestinationType == DestinationType.TemporaryTopic;
 			}
 		}
 

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=1576997&r1=1576996&r2=1576997&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs Wed Mar
12 23:19:06 2014
@@ -49,35 +49,35 @@ namespace Apache.NMS.ZMQ
 			set { this.consumerTransformer = value; }
 		}
 
-		public MessageConsumer(Session session, AcknowledgementMode acknowledgementMode, IDestination
dest, string selector)
+		public MessageConsumer(Session sess, AcknowledgementMode ackMode, IDestination dest, string
selector)
 		{
 			// UNUSED_PARAM(selector);		// Selectors are not currently supported
 
-			if(null == session.Connection.Context)
+			if(null == sess.Connection.Context)
 			{
 				throw new NMSConnectionException();
 			}
 
-			this.session = session;
+			this.session = sess;
 			this.destination = (Destination) dest;
-			this.acknowledgementMode = acknowledgementMode;
+			this.acknowledgementMode = ackMode;
 		}
 
 		public event MessageListener Listener
 		{
 			add
 			{
-				listener += value;
-				listenerCount++;
+				this.listener += value;
+				this.listenerCount++;
 				StartAsyncDelivery();
 			}
 
 			remove
 			{
-				if(listenerCount > 0)
+				if(this.listenerCount > 0)
 				{
-					listener -= value;
-					listenerCount--;
+					this.listener -= value;
+					this.listenerCount--;
 				}
 
 				if(0 == listenerCount)
@@ -152,18 +152,18 @@ namespace Apache.NMS.ZMQ
 		{
 			lock(asyncDeliveryLock)
 			{
-				asyncDelivery = false;
-				if(null != asyncDeliveryThread)
+				this.asyncDelivery = false;
+				if(null != this.asyncDeliveryThread)
 				{
 					Tracer.Info("Stopping async delivery thread.");
-					asyncDeliveryThread.Interrupt();
-					if(!asyncDeliveryThread.Join(10000))
+					this.asyncDeliveryThread.Interrupt();
+					if(!this.asyncDeliveryThread.Join(10000))
 					{
 						Tracer.Info("Aborting async delivery thread.");
-						asyncDeliveryThread.Abort();
+						this.asyncDeliveryThread.Abort();
 					}
 
-					asyncDeliveryThread = null;
+					this.asyncDeliveryThread = null;
 					Tracer.Info("Async delivery thread stopped.");
 				}
 			}
@@ -171,20 +171,20 @@ namespace Apache.NMS.ZMQ
 
 		protected virtual void StartAsyncDelivery()
 		{
-			Debug.Assert(null == asyncDeliveryThread);
-			lock(asyncDeliveryLock)
+			Debug.Assert(null == this.asyncDeliveryThread);
+			lock(this.asyncDeliveryLock)
 			{
-				asyncDelivery = true;
-				asyncDeliveryThread = new Thread(new ThreadStart(DispatchLoop));
-				asyncDeliveryThread.Name = string.Format("MsgConsumerAsync: {0}", this.destination.Name);
-				asyncDeliveryThread.IsBackground = true;
-				asyncDeliveryThread.Start();
+				this.asyncDelivery = true;
+				this.asyncDeliveryThread = new Thread(new ThreadStart(DispatchLoop));
+				this.asyncDeliveryThread.Name = string.Format("MsgConsumerAsync: {0}", this.destination.Name);
+				this.asyncDeliveryThread.IsBackground = true;
+				this.asyncDeliveryThread.Start();
 			}
 		}
 
 		protected virtual void DispatchLoop()
 		{
-			Tracer.Info("Starting dispatcher thread consumer: " + this);
+			Tracer.InfoFormat("Starting dispatcher thread consumer: {0}", this.asyncDeliveryThread.Name);
 			TimeSpan receiveWait = TimeSpan.FromSeconds(3);
 
 			while(asyncDelivery)
@@ -214,12 +214,12 @@ namespace Apache.NMS.ZMQ
 					Tracer.ErrorFormat("Exception while receiving message in thread: {0} : {1}", this, ex.Message);
 				}
 			}
-			Tracer.Info("Stopped dispatcher thread consumer: " + this);
+			Tracer.InfoFormat("Stopped dispatcher thread consumer: {0}", this.asyncDeliveryThread.Name);
 		}
 
 		protected virtual void HandleAsyncException(Exception e)
 		{
-			session.Connection.HandleException(e);
+			this.session.Connection.HandleException(e);
 		}
 
 		/// <summary>

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs?rev=1576997&r1=1576996&r2=1576997&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs Wed Mar
12 23:19:06 2014
@@ -45,30 +45,30 @@ namespace Apache.NMS.ZMQ
 			set { this.producerTransformer = value; }
 		}
 
-		public MessageProducer(Session session, IDestination dest)
+		public MessageProducer(Session sess, IDestination dest)
 		{
-			if(null == session.Connection.Context)
+			if(null == sess.Connection.Context)
 			{
 				throw new NMSConnectionException();
 			}
 
-			this.session = session;
+			this.session = sess;
 			this.destination = dest;
 		}
 
 		public void Send(IMessage message)
 		{
-			Send(Destination, message);
+			Send(this.Destination, message);
 		}
 
 		public void Send(IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority,
TimeSpan timeToLive)
 		{
-			Send(Destination, message, deliveryMode, priority, timeToLive);
+			Send(this.Destination, message, deliveryMode, priority, timeToLive);
 		}
 
 		public void Send(IDestination dest, IMessage message)
 		{
-			Send(dest, message, DeliveryMode, Priority, TimeToLive);
+			Send(dest, message, this.DeliveryMode, this.Priority, this.TimeToLive);
 		}
 
 		public void Send(IDestination dest, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority
priority, TimeSpan timeToLive)
@@ -91,10 +91,10 @@ namespace Apache.NMS.ZMQ
 
 			// Prefix the message with the destination name. The client will subscribe to this destination
name
 			// in order to receive messages.
-			Destination destination = (Destination) dest;
+			Destination theDest = (Destination) dest;
 
-			string msg = destination.Name + ((ITextMessage) message).Text;
-			destination.Send(Encoding.UTF8.GetBytes(msg), this.session.Connection.RequestTimeout);
+			string msg = theDest.Name + ((ITextMessage) message).Text;
+			theDest.Send(Encoding.UTF8.GetBytes(msg), this.session.Connection.RequestTimeout);
 		}
 
 		public void Dispose()
@@ -104,58 +104,59 @@ namespace Apache.NMS.ZMQ
 
 		public void Close()
 		{
+			this.destination = null;
 		}
 
 		public IMessage CreateMessage()
 		{
-			return session.CreateMessage();
+			return this.session.CreateMessage();
 		}
 
 		public ITextMessage CreateTextMessage()
 		{
-			return session.CreateTextMessage();
+			return this.session.CreateTextMessage();
 		}
 
 		public ITextMessage CreateTextMessage(String text)
 		{
-			return session.CreateTextMessage(text);
+			return this.session.CreateTextMessage(text);
 		}
 
 		public IMapMessage CreateMapMessage()
 		{
-			return session.CreateMapMessage();
+			return this.session.CreateMapMessage();
 		}
 
 		public IObjectMessage CreateObjectMessage(Object body)
 		{
-			return session.CreateObjectMessage(body);
+			return this.session.CreateObjectMessage(body);
 		}
 
 		public IBytesMessage CreateBytesMessage()
 		{
-			return session.CreateBytesMessage();
+			return this.session.CreateBytesMessage();
 		}
 
 		public IBytesMessage CreateBytesMessage(byte[] body)
 		{
-			return session.CreateBytesMessage(body);
+			return this.session.CreateBytesMessage(body);
 		}
 
 		public IStreamMessage CreateStreamMessage()
 		{
-			return session.CreateStreamMessage();
+			return this.session.CreateStreamMessage();
 		}
 
 		public MsgDeliveryMode DeliveryMode
 		{
-			get { return deliveryMode; }
-			set { deliveryMode = value; }
+			get { return this.deliveryMode; }
+			set { this.deliveryMode = value; }
 		}
 
 		public TimeSpan TimeToLive
 		{
-			get { return timeToLive; }
-			set { timeToLive = value; }
+			get { return this.timeToLive; }
+			set { this.timeToLive = value; }
 		}
 
 		/// <summary>
@@ -169,26 +170,26 @@ namespace Apache.NMS.ZMQ
 
 		public IDestination Destination
 		{
-			get { return destination; }
-			set { destination = value; }
+			get { return this.destination; }
+			set { this.destination = value; }
 		}
 
 		public MsgPriority Priority
 		{
-			get { return priority; }
-			set { priority = value; }
+			get { return this.priority; }
+			set { this.priority = value; }
 		}
 
 		public bool DisableMessageID
 		{
-			get { return disableMessageID; }
-			set { disableMessageID = value; }
+			get { return this.disableMessageID; }
+			set { this.disableMessageID = value; }
 		}
 
 		public bool DisableMessageTimestamp
 		{
-			get { return disableMessageTimestamp; }
-			set { disableMessageTimestamp = value; }
+			get { return this.disableMessageTimestamp; }
+			set { this.disableMessageTimestamp = value; }
 		}
 	}
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Queue.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Queue.cs?rev=1576997&r1=1576996&r2=1576997&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Queue.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Queue.cs Wed Mar 12 23:19:06
2014
@@ -42,6 +42,14 @@ namespace Apache.NMS.ZMQ
 		}
 
 		#endregion
+
+		/// <summary>
+		/// </summary>
+		/// <returns>string representation of this instance</returns>
+		public override string ToString()
+		{
+			return "queue://" + this.destinationName;
+		}
 	}
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryQueue.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryQueue.cs?rev=1576997&r1=1576996&r2=1576997&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryQueue.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryQueue.cs Wed Mar
12 23:19:06 2014
@@ -51,5 +51,13 @@ namespace Apache.NMS.ZMQ
 		}
 
 		#endregion
+
+		/// <summary>
+		/// </summary>
+		/// <returns>string representation of this instance</returns>
+		public override string ToString()
+		{
+			return "temp-queue://" + this.destinationName;
+		}
 	}
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryTopic.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryTopic.cs?rev=1576997&r1=1576996&r2=1576997&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryTopic.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryTopic.cs Wed Mar
12 23:19:06 2014
@@ -51,6 +51,14 @@ namespace Apache.NMS.ZMQ
 		}
 
 		#endregion
+
+		/// <summary>
+		/// </summary>
+		/// <returns>string representation of this instance</returns>
+		public override string ToString()
+		{
+			return "temp-topic://" + this.destinationName;
+		}
 	}
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Topic.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Topic.cs?rev=1576997&r1=1576996&r2=1576997&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Topic.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Topic.cs Wed Mar 12 23:19:06
2014
@@ -42,6 +42,14 @@ namespace Apache.NMS.ZMQ
 		}
 
 		#endregion
+
+		/// <summary>
+		/// </summary>
+		/// <returns>string representation of this instance</returns>
+		public override string ToString()
+		{
+			return "topic://" + this.destinationName;
+		}
 	}
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/MultiProducersMultiConsumers.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/MultiProducersMultiConsumers.cs?rev=1576997&r1=1576996&r2=1576997&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/MultiProducersMultiConsumers.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/MultiProducersMultiConsumers.cs
Wed Mar 12 23:19:06 2014
@@ -26,10 +26,53 @@ namespace Apache.NMS.ZMQ
 	{
 		private int totalMsgCountToReceive = 0;
 
-		private class ConsumerTracker
+		private class ConsumerTracker : IMessageConsumer
 		{
-			public IMessageConsumer consumer;
+			private IMessageConsumer consumer;
 			public int msgCount = 0;
+
+			public ConsumerTracker(ISession session, IDestination testDestination)
+			{
+				this.consumer = session.CreateConsumer(testDestination);
+				Assert.IsNotNull(this.consumer, "Error creating consumer on {0}", testDestination.ToString());
+			}
+
+			public void Close()
+			{
+				this.consumer.Close();
+			}
+
+			public ConsumerTransformerDelegate ConsumerTransformer
+			{
+				get { return this.consumer.ConsumerTransformer; }
+				set { this.consumer.ConsumerTransformer = value; }
+			}
+
+			public event MessageListener Listener
+			{
+				add { this.consumer.Listener += value; }
+				remove { this.consumer.Listener -= value; }
+			}
+
+			public IMessage Receive(TimeSpan timeout)
+			{
+				return this.consumer.Receive(timeout);
+			}
+
+			public IMessage Receive()
+			{
+				return this.consumer.Receive();
+			}
+
+			public IMessage ReceiveNoWait()
+			{
+				return this.consumer.ReceiveNoWait();
+			}
+
+			public void Dispose()
+			{
+				this.consumer.Dispose();
+			}
 		}
 
 		[Test]
@@ -50,86 +93,86 @@ namespace Apache.NMS.ZMQ
 				using(ISession session = connection.CreateSession())
 				{
 					Assert.IsNotNull(session, "Error creating Session.");
-					IDestination testDestination = session.GetDestination(destination);
-					Assert.IsNotNull(testDestination, "Error creating test destination: {0}", destination);
+					using(IDestination testDestination = session.GetDestination(destination))
+					{
+						Assert.IsNotNull(testDestination, "Error creating test destination: {0}", destination);
 
-					// Track the number of messages we should receive
-					this.totalMsgCountToReceive = numProducers * numConsumers;
+						// Track the number of messages we should receive
+						this.totalMsgCountToReceive = numProducers * numConsumers;
 
-					ConsumerTracker[] consumerTrackers = null;
-					IMessageProducer[] producers = null;
+						ConsumerTracker[] consumerTrackers = null;
+						IMessageProducer[] producers = null;
 
-					try
-					{
-						// Create the consumers
-						consumerTrackers = new ConsumerTracker[numConsumers];
-						for(int index = 0; index < numConsumers; index++)
+						try
 						{
-							ConsumerTracker tracker = new ConsumerTracker();
-							tracker.consumer = session.CreateConsumer(testDestination);
-							Assert.IsNotNull(tracker.consumer, "Error creating consumer #{0} on {1}", index, destination);
-							tracker.consumer.Listener += (message) =>
-								{
-									Assert.IsInstanceOf<TextMessage>(message, "Wrong message type received.");
-									ITextMessage textMsg = (ITextMessage) message;
-									Assert.AreEqual(textMsg.Text, "Zero Message.");
-									tracker.msgCount++;
-								};
-							consumerTrackers[index] = tracker;
-						}
+							// Create the consumers
+							consumerTrackers = new ConsumerTracker[numConsumers];
+							for(int index = 0; index < numConsumers; index++)
+							{
+								ConsumerTracker tracker = new ConsumerTracker(session, testDestination);
+								tracker.Listener += (message) =>
+									{
+										Assert.IsInstanceOf<TextMessage>(message, "Wrong message type received.");
+										ITextMessage textMsg = (ITextMessage) message;
+										Assert.AreEqual(textMsg.Text, "Zero Message.");
+										tracker.msgCount++;
+									};
+								consumerTrackers[index] = tracker;
+							}
 
-						// Create the producers
-						producers = new IMessageProducer[numProducers];
-						for(int index = 0; index < numProducers; index++)
-						{
-							producers[index] = session.CreateProducer(testDestination);
-							Assert.IsNotNull(producers[index], "Error creating producer #{0} on {1}", index, destination);
-						}
+							// Create the producers
+							producers = new IMessageProducer[numProducers];
+							for(int index = 0; index < numProducers; index++)
+							{
+								producers[index] = session.CreateProducer(testDestination);
+								Assert.IsNotNull(producers[index], "Error creating producer #{0} on {1}", index,
destination);
+							}
 
-						// Send the messages
-						for(int index = 0; index < numProducers; index++)
-						{
-							ITextMessage testMsg = producers[index].CreateTextMessage("Zero Message.");
-							Assert.IsNotNull(testMsg, "Error creating test message for producer #{0}.", index);
-							producers[index].Send(testMsg);
-						}
+							// Send the messages
+							for(int index = 0; index < numProducers; index++)
+							{
+								ITextMessage testMsg = session.CreateTextMessage("Zero Message.");
+								Assert.IsNotNull(testMsg, "Error creating test message for producer #{0}.", index);
+								producers[index].Send(testMsg);
+							}
 
-						// Wait for the message
-						DateTime startWaitTime = DateTime.Now;
-						TimeSpan maxWaitTime = TimeSpan.FromSeconds(10);
+							// Wait for the message
+							DateTime startWaitTime = DateTime.Now;
+							TimeSpan maxWaitTime = TimeSpan.FromSeconds(5);
 
-						while(GetNumMsgsReceived(consumerTrackers) < this.totalMsgCountToReceive)
-						{
-							if((DateTime.Now - startWaitTime) > maxWaitTime)
+							while(GetNumMsgsReceived(consumerTrackers) < this.totalMsgCountToReceive)
 							{
-								Assert.Fail("Timeout waiting for message receive.");
+								if((DateTime.Now - startWaitTime) > maxWaitTime)
+								{
+									Assert.Fail("Timeout waiting for message receive.");
+								}
+
+								Thread.Sleep(5);
 							}
 
-							Thread.Sleep(5);
+							// Sleep for an extra 2 seconds to see if any extra messages get delivered
+							Thread.Sleep(2 * 1000);
+							Assert.AreEqual(this.totalMsgCountToReceive, GetNumMsgsReceived(consumerTrackers),
"Received too many messages.");
 						}
-
-						// Sleep for an extra 2 seconds to see if any extra messages get delivered
-						Thread.Sleep(2 * 1000);
-						Assert.AreEqual(this.totalMsgCountToReceive, GetNumMsgsReceived(consumerTrackers),
"Received too many messages.");
-					}
-					finally
-					{
-
-						// Clean up the producers
-						if(null != producers)
+						finally
 						{
-							foreach(IMessageProducer producer in producers)
+
+							// Clean up the producers
+							if(null != producers)
 							{
-								producer.Dispose();
+								foreach(IMessageProducer producer in producers)
+								{
+									producer.Dispose();
+								}
 							}
-						}
 
-						// Clean up the consumers
-						if(null != consumerTrackers)
-						{
-							foreach(ConsumerTracker tracker in consumerTrackers)
+							// Clean up the consumers
+							if(null != consumerTrackers)
 							{
-								tracker.consumer.Dispose();
+								foreach(ConsumerTracker tracker in consumerTrackers)
+								{
+									tracker.Dispose();
+								}
 							}
 						}
 					}
@@ -137,6 +180,25 @@ namespace Apache.NMS.ZMQ
 			}
 		}
 
+		[Test]
+		private void SingleProducerMultipleDestinations()
+		{
+			string[] destinations = new string[]
+					{
+						"queue://ZMQTestQueue1",
+						"queue://ZMQTestQueue2",
+						"topic://ZMQTestTopic1",
+						"topic://ZMQTestTopic2",
+						"temp-queue://ZMQTempQueue1",
+						"temp-queue://ZMQTempQueue1",
+						"temp-topic://ZMQTempTopic1",
+						"temp-topic://ZMQTempTopic2"
+					};
+
+			// TODO: Create one producer, and then use it to send to multiple destinations.
+			Assert.Fail("Not implemented.");
+		}
+
 		private int GetNumMsgsReceived(ConsumerTracker[] consumerTrackers)
 		{
 			int numMsgs = 0;

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs?rev=1576997&r1=1576996&r2=1576997&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs Wed Mar 12 23:19:06
2014
@@ -24,7 +24,7 @@ namespace Apache.NMS.ZMQ
 	[TestFixture]
 	public class ZMQTest : BaseTest
 	{
-		private bool receivedTestMessage = true;
+		private bool receivedTestMessage = false;
 
 		[Test]
 		public void TestConnection()
@@ -69,9 +69,11 @@ namespace Apache.NMS.ZMQ
 				using(ISession session = connection.CreateSession())
 				{
 					Assert.IsNotNull(session, "Error creating session.");
-					IDestination testDestination = session.GetDestination(destination);
-					Assert.IsNotNull(testDestination, "Error creating test destination: {0}", destination);
-					Assert.IsInstanceOf(destinationType, testDestination, "Wrong destintation type.");
+					using(IDestination testDestination = session.GetDestination(destination))
+					{
+						Assert.IsNotNull(testDestination, "Error creating test destination: {0}", destination);
+						Assert.IsInstanceOf(destinationType, testDestination, "Wrong destintation type.");
+					}
 				}
 			}
 		}
@@ -89,12 +91,14 @@ namespace Apache.NMS.ZMQ
 				using(ISession session = connection.CreateSession())
 				{
 					Assert.IsNotNull(session, "Error creating session.");
-					IDestination testDestination = session.GetDestination(destination);
-					Assert.IsNotNull(testDestination, "Error creating test destination: {0}", destination);
-					using(IMessageProducer producer = session.CreateProducer(testDestination))
+					using(IDestination testDestination = session.GetDestination(destination))
 					{
-						Assert.IsNotNull(producer, "Error creating producer on {0}", destination);
-						Assert.IsInstanceOf<MessageProducer>(producer, "Wrong producer type.");
+						Assert.IsNotNull(testDestination, "Error creating test destination: {0}", destination);
+						using(IMessageProducer producer = session.CreateProducer(testDestination))
+						{
+							Assert.IsNotNull(producer, "Error creating producer on {0}", destination);
+							Assert.IsInstanceOf<MessageProducer>(producer, "Wrong producer type.");
+						}
 					}
 				}
 			}
@@ -113,12 +117,14 @@ namespace Apache.NMS.ZMQ
 				using(ISession session = connection.CreateSession())
 				{
 					Assert.IsNotNull(session, "Error creating session.");
-					IDestination testDestination = session.GetDestination(destination);
-					Assert.IsNotNull(testDestination, "Error creating test destination: {0}", destination);
-					using(IMessageConsumer consumer = session.CreateConsumer(testDestination))
+					using(IDestination testDestination = session.GetDestination(destination))
 					{
-						Assert.IsNotNull(consumer, "Error creating consumer on {0}", destination);
-						Assert.IsInstanceOf<MessageConsumer>(consumer, "Wrong consumer type.");
+						Assert.IsNotNull(testDestination, "Error creating test destination: {0}", destination);
+						using(IMessageConsumer consumer = session.CreateConsumer(testDestination))
+						{
+							Assert.IsNotNull(consumer, "Error creating consumer on {0}", destination);
+							Assert.IsInstanceOf<MessageConsumer>(consumer, "Wrong consumer type.");
+						}
 					}
 				}
 			}
@@ -131,38 +137,42 @@ namespace Apache.NMS.ZMQ
 		{
 			IConnectionFactory factory = NMSConnectionFactory.CreateConnectionFactory(new Uri("zmq:tcp://localhost:5556"));
 			Assert.IsNotNull(factory, "Error creating connection factory.");
+
+			this.receivedTestMessage = false;
 			using(IConnection connection = factory.CreateConnection())
 			{
 				Assert.IsNotNull(connection, "Problem creating connection class. Usually problem with
libzmq and clrzmq ");
 				using(ISession session = connection.CreateSession())
 				{
 					Assert.IsNotNull(session, "Error creating Session.");
-					IDestination testDestination = session.GetDestination(destination);
-					Assert.IsNotNull(testDestination, "Error creating test destination: {0}", destination);
-					using(IMessageConsumer consumer = session.CreateConsumer(testDestination))
+					using(IDestination testDestination = session.GetDestination(destination))
 					{
-						Assert.IsNotNull(consumer, "Error creating consumer on {0}", destination);
-						consumer.Listener += OnMessage;
-						using(IMessageProducer producer = session.CreateProducer(testDestination))
+						Assert.IsNotNull(testDestination, "Error creating test destination: {0}", destination);
+						using(IMessageConsumer consumer = session.CreateConsumer(testDestination))
 						{
-							Assert.IsNotNull(consumer, "Error creating producer on {0}", destination);
-							ITextMessage testMsg = producer.CreateTextMessage("Zero Message.");
-							Assert.IsNotNull(testMsg, "Error creating test message.");
-							producer.Send(testMsg);
-						}
+							Assert.IsNotNull(consumer, "Error creating consumer on {0}", destination);
+							consumer.Listener += OnMessage;
+							using(IMessageProducer producer = session.CreateProducer(testDestination))
+							{
+								Assert.IsNotNull(consumer, "Error creating producer on {0}", destination);
+								ITextMessage testMsg = producer.CreateTextMessage("Zero Message.");
+								Assert.IsNotNull(testMsg, "Error creating test message.");
+								producer.Send(testMsg);
+							}
 
-						// Wait for the message
-						DateTime startWaitTime = DateTime.Now;
-						TimeSpan maxWaitTime = TimeSpan.FromSeconds(10);
+							// Wait for the message
+							DateTime startWaitTime = DateTime.Now;
+							TimeSpan maxWaitTime = TimeSpan.FromSeconds(5);
 
-						while(!receivedTestMessage)
-						{
-							if((DateTime.Now - startWaitTime) > maxWaitTime)
+							while(!receivedTestMessage)
 							{
-								Assert.Fail("Timeout waiting for message receive.");
-							}
+								if((DateTime.Now - startWaitTime) > maxWaitTime)
+								{
+									Assert.Fail("Timeout waiting for message receive.");
+								}
 
-							Thread.Sleep(5);
+								Thread.Sleep(5);
+							}
 						}
 					}
 				}



Mime
View raw message