activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jgo...@apache.org
Subject svn commit: r1579357 - in /activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src: main/csharp/BaseMessage.cs main/csharp/Connection.cs main/csharp/Destination.cs main/csharp/MessageConsumer.cs main/csharp/MessageProducer.cs test/csharp/ZMQTest.cs
Date Wed, 19 Mar 2014 19:09:34 GMT
Author: jgomes
Date: Wed Mar 19 19:09:34 2014
New Revision: 1579357

URL: http://svn.apache.org/r1579357
Log:
Implement disposable pattern for Connection.
Only dispose the producer endpoint after its final release.
Add overloaded send/receive API for destinations.
Initialize the sockets on the correct message handler thread.

Modified:
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/BaseMessage.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Connection.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/BaseMessage.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/BaseMessage.cs?rev=1579357&r1=1579356&r2=1579357&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/BaseMessage.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/BaseMessage.cs Wed Mar 19
19:09:34 2014
@@ -29,7 +29,7 @@ namespace Apache.NMS.ZMQ
 		private string correlationId;
 		private TimeSpan timeToLive;
 		private string messageId;
-		private MsgDeliveryMode deliveryMode;
+		private MsgDeliveryMode deliveryMode = MsgDeliveryMode.NonPersistent;
 		private MsgPriority priority;
 		private Destination replyTo;
 		private byte[] content;

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Connection.cs?rev=1579357&r1=1579356&r2=1579357&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Connection.cs Wed Mar 19
19:09:34 2014
@@ -19,6 +19,7 @@ using System;
 using ZeroMQ;
 using System.Collections.Generic;
 using System.Text;
+using System.Collections;
 
 namespace Apache.NMS.ZMQ
 {
@@ -46,24 +47,87 @@ namespace Apache.NMS.ZMQ
         /// <summary>
         /// ZMQ context
         /// </summary>
+		private static object contextLock = new object();
+		private static int instanceCount = 0;
 		private static ZmqContext _context;
-		private static Dictionary<string, ProducerRef> producerCache;
-		private static object producerCacheLock;
+		private static Dictionary<string, ProducerRef> producerCache = new Dictionary<string,
ProducerRef>();
+		private static object producerCacheLock = new object();
+		private TimeSpan zeroTimeout = new TimeSpan(0);
 
-		static Connection()
+		private bool disposed = false;
+
+		private static void InitContext()
+		{
+			lock(contextLock)
+			{
+				if(0 == instanceCount++)
+				{
+					Connection._context = ZmqContext.Create();
+				}
+			}
+		}
+
+		private static void DestroyContext()
 		{
-			Connection._context = ZmqContext.Create();
-			Connection.producerCache = new Dictionary<string, ProducerRef>();
-			Connection.producerCacheLock = new object();
+			lock(contextLock)
+			{
+				if(0 == --instanceCount)
+				{
+					Connection._context.Dispose();
+				}
+			}
 		}
 
 		public Connection(Uri connectionUri)
 		{
+			InitContext();
 			this.brokerUri = connectionUri;
 			this.producerContextBinding = string.Format("{0}://*:{1}", this.brokerUri.Scheme, this.brokerUri.Port);
 			this.consumerContextBinding = string.Format("{0}://{1}:{2}", brokerUri.Scheme, brokerUri.Host,
this.brokerUri.Port);
 		}
 
+		~Connection()
+		{
+			Dispose(false);
+		}
+
+		public void Dispose()
+		{
+			Dispose(true);
+			GC.SuppressFinalize(this);
+		}
+
+		private void Dispose(bool disposing)
+		{
+			if(disposed)
+			{
+				return;
+			}
+
+			if(disposing)
+			{
+				try
+				{
+					OnDispose();
+				}
+				catch(Exception ex)
+				{
+					Tracer.ErrorFormat("Exception disposing Connection {0}: {1}", this.brokerUri.AbsoluteUri,
ex.Message);
+				}
+			}
+
+			disposed = true;
+		}
+
+		/// <summary>
+		/// Child classes can override this method to perform clean-up logic.
+		/// </summary>
+		protected virtual void OnDispose()
+		{
+			Close();
+			DestroyContext();
+		}
+
 		/// <summary>
         /// Starts message delivery for this connection.
         /// </summary>
@@ -147,12 +211,13 @@ namespace Apache.NMS.ZMQ
 					{
 						producerCache.Remove(contextBinding);
 						producerRef.producer.Unbind(contextBinding);
+						producerRef.producer.Dispose();
 					}
 				}
 			}
 		}
 
-		internal ZmqSocket GetConsumer(Encoding encoding, string destinationName)
+		internal ZmqSocket GetConsumer()
 		{
 			ZmqSocket endpoint = this.Context.CreateSocket(SocketType.SUB);
 
@@ -160,8 +225,6 @@ namespace Apache.NMS.ZMQ
 			{
 				throw new ResourceAllocationException();
 			}
-			endpoint.Subscribe(encoding.GetBytes(destinationName));
-			endpoint.Connect(GetConsumerBindingPath());
 
 			return endpoint;
 		}
@@ -169,6 +232,7 @@ namespace Apache.NMS.ZMQ
 		internal void ReleaseConsumer(ZmqSocket endpoint)
 		{
 			endpoint.Disconnect(GetConsumerBindingPath());
+			endpoint.Dispose();
 		}
 
 		internal string GetProducerContextBinding()
@@ -176,16 +240,11 @@ namespace Apache.NMS.ZMQ
 			return this.producerContextBinding;
 		}
 
-		private string GetConsumerBindingPath()
+		internal string GetConsumerBindingPath()
 		{
 			return this.consumerContextBinding;
 		}
 
-		public void Dispose()
-        {
-            Close();
-        }
-
         public void Close()
         {
             Stop();

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs?rev=1579357&r1=1579356&r2=1579357&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs Wed Mar 19
19:09:34 2014
@@ -18,6 +18,7 @@
 using System;
 using System.Text;
 using ZeroMQ;
+using System.Diagnostics;
 
 namespace Apache.NMS.ZMQ
 {
@@ -26,6 +27,8 @@ namespace Apache.NMS.ZMQ
 	/// </summary>
 	public abstract class Destination : IDestination
 	{
+		public static Encoding encoding = Encoding.UTF8;
+
 		protected Session session;
 		/// <summary>
 		/// Socket object
@@ -92,14 +95,12 @@ namespace Apache.NMS.ZMQ
 					this.session.Connection.ReleaseProducer(this.producerEndpoint);
 				}
 
-				this.producerEndpoint.Dispose();
 				this.producerEndpoint = null;
 			}
 
 			if(null != this.consumerEndpoint)
 			{
 				this.session.Connection.ReleaseConsumer(this.consumerEndpoint);
-				this.consumerEndpoint.Dispose();
 				this.consumerEndpoint = null;
 			}
 		}
@@ -178,35 +179,82 @@ namespace Apache.NMS.ZMQ
 			get;
 		}
 
-		internal int Send(byte[] buffer, TimeSpan timeout)
+		internal void InitSender()
 		{
 			if(null == this.producerEndpoint)
 			{
 				this.producerEndpoint = this.session.Connection.GetProducer();
 			}
-
-			return this.producerEndpoint.Send(buffer, buffer.Length, SocketFlags.None, timeout);
 		}
 
-		internal string Receive(Encoding encoding, TimeSpan timeout)
+		internal void InitReceiver()
 		{
 			if(null == this.consumerEndpoint)
 			{
-				this.consumerEndpoint = this.session.Connection.GetConsumer(encoding, this.destinationName);
+				Connection connection = this.session.Connection;
+
+				this.consumerEndpoint = connection.GetConsumer();
+				// Must subscribe first before connecting to the endpoint binding
+				this.consumerEndpoint.Subscribe(Destination.encoding.GetBytes(this.destinationName));
+				this.consumerEndpoint.Connect(connection.GetConsumerBindingPath());
 			}
+		}
+
+		internal void Subscribe(string prefixName)
+		{
+			InitReceiver();
+			this.consumerEndpoint.Subscribe(Destination.encoding.GetBytes(prefixName));
+		}
+
+		internal void Unsubscribe(string prefixName)
+		{
+			if(null != this.consumerEndpoint)
+			{
+				this.consumerEndpoint.Unsubscribe(Destination.encoding.GetBytes(prefixName));
+			}
+		}
 
-			return consumerEndpoint.Receive(encoding, timeout);
+		internal SendStatus Send(string msg)
+		{
+			Debug.Assert(null != this.producerEndpoint, "Call InitSender() before calling Send().");
+			return this.producerEndpoint.Send(msg, Destination.encoding);
+		}
+
+		internal SendStatus Send(byte[] buffer)
+		{
+			Debug.Assert(null != this.producerEndpoint, "Call InitSender() before calling Send().");
+			return this.producerEndpoint.Send(buffer);
+		}
+
+		internal string ReceiveString(TimeSpan timeout)
+		{
+			this.InitReceiver();
+			return this.consumerEndpoint.Receive(Destination.encoding, timeout);
+		}
+
+		internal byte[] ReceiveBytes(TimeSpan timeout, out int size)
+		{
+			this.InitReceiver();
+			return this.consumerEndpoint.Receive(null, timeout, out size);
+		}
+
+		internal byte[] ReceiveBytes(SocketFlags flags, out int size)
+		{
+			this.InitReceiver();
+			return this.consumerEndpoint.Receive(null, flags, out size);
 		}
 
 		internal Frame ReceiveFrame()
 		{
 			// TODO: Implement
+			this.InitReceiver();
 			return null;
 		}
 
 		internal ZmqMessage ReceiveMessage()
 		{
 			// TODO: Implement
+			this.InitReceiver();
 			return null;
 		}
 	}

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=1579357&r1=1579356&r2=1579357&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs Wed Mar
19 19:09:34 2014
@@ -15,14 +15,10 @@
  * limitations under the License.
  */
 
-#define PUBSUB
-
 using System;
+using System.Diagnostics;
 using System.Text;
 using System.Threading;
-using Apache.NMS.Util;
-using ZeroMQ;
-using System.Diagnostics;
 
 namespace Apache.NMS.ZMQ
 {
@@ -31,7 +27,7 @@ namespace Apache.NMS.ZMQ
 	/// </summary>
 	public class MessageConsumer : IMessageConsumer
 	{
-		protected TimeSpan zeroTimeout = new TimeSpan(0);
+		protected static readonly TimeSpan zeroTimeout = new TimeSpan(0);
 
 		private readonly Session session;
 		private readonly AcknowledgementMode acknowledgementMode;
@@ -41,6 +37,8 @@ namespace Apache.NMS.ZMQ
 		private Thread asyncDeliveryThread = null;
 		private object asyncDeliveryLock = new object();
 		private bool asyncDelivery = false;
+		private bool asyncInit = false;
+		private byte[] rawDestinationName;
 
 		private ConsumerTransformerDelegate consumerTransformer;
 		public ConsumerTransformerDelegate ConsumerTransformer
@@ -60,29 +58,40 @@ namespace Apache.NMS.ZMQ
 
 			this.session = sess;
 			this.destination = (Destination) dest;
+			this.rawDestinationName = Destination.encoding.GetBytes(this.destination.Name);
 			this.acknowledgementMode = ackMode;
 		}
 
+		private object listenerLock = new object();
 		public event MessageListener Listener
 		{
 			add
 			{
-				this.listener += value;
-				this.listenerCount++;
-				StartAsyncDelivery();
+				lock(listenerLock)
+				{
+					this.listener += value;
+					if(0 == this.listenerCount)
+					{
+						StartAsyncDelivery();
+					}
+
+					this.listenerCount++;
+				}
 			}
 
 			remove
 			{
-				if(this.listenerCount > 0)
+				lock(listenerLock)
 				{
 					this.listener -= value;
-					this.listenerCount--;
-				}
-
-				if(0 == listenerCount)
-				{
-					StopAsyncDelivery();
+					if(this.listenerCount > 0)
+					{
+						this.listenerCount--;
+						if(0 == this.listenerCount)
+						{
+							StopAsyncDelivery();
+						}
+					}
 				}
 			}
 		}
@@ -106,15 +115,17 @@ namespace Apache.NMS.ZMQ
 		/// </returns>
 		public IMessage Receive(TimeSpan timeout)
 		{
-			// TODO: Support decoding of all message types + all meta data (e.g., headers and properties)
-			string msgContent = this.destination.Receive(Encoding.UTF8, timeout);
+			int size;
+			byte[] receivedMsg = this.destination.ReceiveBytes(timeout, out size);
 
-			if(null != msgContent)
+			if(size > 0)
 			{
 				// Strip off the subscribed destination name.
-				string destinationName = this.destination.Name;
-				string messageText = msgContent.Substring(destinationName.Length, msgContent.Length -
destinationName.Length);
-				return ToNmsMessage(messageText);
+				// TODO: Support decoding of all message types + all meta data (e.g., headers and properties)
+				int msgStart = this.rawDestinationName.Length;
+				int msgLength = receivedMsg.Length - msgStart;
+				string msgContent = Encoding.UTF8.GetString(receivedMsg, msgStart, msgLength);
+				return ToNmsMessage(msgContent);
 			}
 
 			return null;
@@ -150,7 +161,7 @@ namespace Apache.NMS.ZMQ
 
 		protected virtual void StopAsyncDelivery()
 		{
-			lock(asyncDeliveryLock)
+			lock(this.asyncDeliveryLock)
 			{
 				this.asyncDelivery = false;
 				if(null != this.asyncDeliveryThread)
@@ -174,33 +185,49 @@ namespace Apache.NMS.ZMQ
 			Debug.Assert(null == this.asyncDeliveryThread);
 			lock(this.asyncDeliveryLock)
 			{
+				this.asyncInit = false;
 				this.asyncDelivery = true;
-				this.asyncDeliveryThread = new Thread(new ThreadStart(DispatchLoop));
+				this.asyncDeliveryThread = new Thread(new ThreadStart(MsgDispatchLoop));
 				this.asyncDeliveryThread.Name = string.Format("MsgConsumerAsync: {0}", this.destination.Name);
 				this.asyncDeliveryThread.IsBackground = true;
 				this.asyncDeliveryThread.Start();
+				while(!asyncInit)
+				{
+					Thread.Sleep(1);
+				}
 			}
 		}
 
-		protected virtual void DispatchLoop()
+		protected virtual void MsgDispatchLoop()
 		{
 			Tracer.InfoFormat("Starting dispatcher thread consumer: {0}", this.asyncDeliveryThread.Name);
-			TimeSpan receiveWait = TimeSpan.FromSeconds(3);
+			TimeSpan receiveWait = TimeSpan.FromSeconds(2);
+
+			// Signal that this thread has started.
+			asyncInit = true;
 
 			while(asyncDelivery)
 			{
 				try
 				{
 					IMessage message = Receive(receiveWait);
-					if(asyncDelivery && message != null)
+
+					if(asyncDelivery)
 					{
-						try
+						if(null != message)
 						{
-							listener(message);
+							try
+							{
+								listener(message);
+							}
+							catch(Exception ex)
+							{
+								HandleAsyncException(ex);
+							}
 						}
-						catch(Exception ex)
+						else
 						{
-							HandleAsyncException(ex);
+							Thread.Sleep(0);
 						}
 					}
 				}

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs?rev=1579357&r1=1579356&r2=1579357&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs Wed Mar
19 19:09:34 2014
@@ -30,9 +30,9 @@ namespace Apache.NMS.ZMQ
 	public class MessageProducer : IMessageProducer
 	{
 		private readonly Session session;
-		private IDestination destination;
+		private Destination destination;
 
-		private MsgDeliveryMode deliveryMode;
+		private MsgDeliveryMode deliveryMode = MsgDeliveryMode.NonPersistent;
 		private TimeSpan timeToLive;
 		private MsgPriority priority;
 		private bool disableMessageID;
@@ -53,17 +53,18 @@ namespace Apache.NMS.ZMQ
 			}
 
 			this.session = sess;
-			this.destination = dest;
+			this.destination = (Destination) dest;
+			this.destination.InitSender();
 		}
 
 		public void Send(IMessage message)
 		{
-			Send(this.Destination, message);
+			Send(this.destination, message);
 		}
 
 		public void Send(IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority,
TimeSpan timeToLive)
 		{
-			Send(this.Destination, message, deliveryMode, priority, timeToLive);
+			Send(this.destination, message, deliveryMode, priority, timeToLive);
 		}
 
 		public void Send(IDestination dest, IMessage message)
@@ -94,7 +95,7 @@ namespace Apache.NMS.ZMQ
 			Destination theDest = (Destination) dest;
 
 			string msg = theDest.Name + ((ITextMessage) message).Text;
-			theDest.Send(Encoding.UTF8.GetBytes(msg), this.session.Connection.RequestTimeout);
+			theDest.Send(msg);
 		}
 
 		public void Dispose()
@@ -168,12 +169,6 @@ namespace Apache.NMS.ZMQ
 			set { }
 		}
 
-		public IDestination Destination
-		{
-			get { return this.destination; }
-			set { this.destination = value; }
-		}
-
 		public MsgPriority Priority
 		{
 			get { return this.priority; }

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs?rev=1579357&r1=1579356&r2=1579357&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs Wed Mar 19 19:09:34
2014
@@ -24,7 +24,7 @@ namespace Apache.NMS.ZMQ
 	[TestFixture]
 	public class ZMQTest : BaseTest
 	{
-		private bool receivedTestMessage = false;
+		private int receivedMsgCount = 0;
 
 		[Test]
 		public void TestConnection()
@@ -132,46 +132,62 @@ namespace Apache.NMS.ZMQ
 
 		[Test]
 		public void TestSendReceive(
+			// inproc, ipc, tcp, pgm, or epgm
+			[Values("zmq:tcp://localhost:5556", "zmq:inproc://localhost:5557")]
+			string connectionName,
 			[Values("queue://ZMQTestQueue", "topic://ZMQTestTopic", "temp-queue://ZMQTempQueue", "temp-topic://ZMQTempTopic")]
-			string destination)
+			string destinationName)
 		{
-			IConnectionFactory factory = NMSConnectionFactory.CreateConnectionFactory(new Uri("zmq:tcp://localhost:5556"));
+			IConnectionFactory factory = NMSConnectionFactory.CreateConnectionFactory(new Uri(connectionName));
 			Assert.IsNotNull(factory, "Error creating connection factory.");
 
-			this.receivedTestMessage = false;
+			this.receivedMsgCount = 0;
 			using(IConnection connection = factory.CreateConnection())
 			{
 				Assert.IsNotNull(connection, "Problem creating connection class. Usually problem with
libzmq and clrzmq ");
 				using(ISession session = connection.CreateSession())
 				{
 					Assert.IsNotNull(session, "Error creating Session.");
-					using(IDestination testDestination = session.GetDestination(destination))
+					using(IDestination testDestination = session.GetDestination(destinationName))
 					{
-						Assert.IsNotNull(testDestination, "Error creating test destination: {0}", destination);
+						Assert.IsNotNull(testDestination, "Error creating test destination: {0}", destinationName);
 						using(IMessageConsumer consumer = session.CreateConsumer(testDestination))
 						{
-							Assert.IsNotNull(consumer, "Error creating consumer on {0}", destination);
-							consumer.Listener += OnMessage;
-							using(IMessageProducer producer = session.CreateProducer(testDestination))
+							Assert.IsNotNull(consumer, "Error creating consumer on {0}", destinationName);
+							int sendMsgCount = 0;
+							try
 							{
-								Assert.IsNotNull(consumer, "Error creating producer on {0}", destination);
-								ITextMessage testMsg = producer.CreateTextMessage("Zero Message.");
-								Assert.IsNotNull(testMsg, "Error creating test message.");
-								producer.Send(testMsg);
-							}
-
-							// Wait for the message
-							DateTime startWaitTime = DateTime.Now;
-							TimeSpan maxWaitTime = TimeSpan.FromSeconds(5);
-
-							while(!receivedTestMessage)
-							{
-								if((DateTime.Now - startWaitTime) > maxWaitTime)
+								consumer.Listener += OnMessage;
+								using(IMessageProducer producer = session.CreateProducer(testDestination))
 								{
-									Assert.Fail("Timeout waiting for message receive.");
-								}
+									Assert.IsNotNull(consumer, "Error creating producer on {0}", destinationName);
+									ITextMessage testMsg = producer.CreateTextMessage("Zero Message.");
+									Assert.IsNotNull(testMsg, "Error creating test message.");
+
+									// Wait for the message
+									DateTime startWaitTime = DateTime.Now;
+									TimeSpan maxWaitTime = TimeSpan.FromSeconds(5);
+
+									// Continually send the message to compensate for the
+									// slow joiner problem inherent to spinning up the
+									// internal dispatching threads in ZeroMQ.
+									while(this.receivedMsgCount < 1)
+									{
+										++sendMsgCount;
+										producer.Send(testMsg);
+										if((DateTime.Now - startWaitTime) > maxWaitTime)
+										{
+											Assert.Fail("Timeout waiting for message receive.");
+										}
 
-								Thread.Sleep(5);
+										Thread.Sleep(1);
+									}
+								}
+							}
+							finally
+							{
+								consumer.Listener -= OnMessage;
+								Console.WriteLine("Sent {0} msgs.\nReceived {1} msgs", sendMsgCount, this.receivedMsgCount);
 							}
 						}
 					}
@@ -188,7 +204,7 @@ namespace Apache.NMS.ZMQ
 			Assert.IsInstanceOf<TextMessage>(message, "Wrong message type received.");
 			ITextMessage textMsg = (ITextMessage) message;
 			Assert.AreEqual(textMsg.Text, "Zero Message.");
-			receivedTestMessage = true;
+			this.receivedMsgCount++;
 		}
 	}
 }



Mime
View raw message