activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jgo...@apache.org
Subject svn commit: r1575473 - in /activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk: ./ src/main/csharp/ src/test/csharp/
Date Sat, 08 Mar 2014 02:49:55 GMT
Author: jgomes
Date: Sat Mar  8 02:49:55 2014
New Revision: 1575473

URL: http://svn.apache.org/r1575473
Log:
Refactoring to support multiple producers and consumers.  Fixed wire protocol format. Added many new unit tests to validate the refactoring, and to give example usage.

Added:
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Utils.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/BaseTest.cs
      - copied, changed from r1573636, activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/FactoryTests.cs
      - copied, changed from r1573636, activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/MultiProducersMultiConsumers.cs
      - copied, changed from r1573636, activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs
Modified:
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/CommonAssemblyInfo.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Connection.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/ConnectionFactory.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Queue.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Session.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryQueue.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryTopic.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TextMessage.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Topic.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/CommonAssemblyInfo.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/vs2010-zmq-net-4.0-test.csproj
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/vs2010-zmq-net-4.0.csproj

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/CommonAssemblyInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/CommonAssemblyInfo.cs?rev=1575473&r1=1575472&r2=1575473&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/CommonAssemblyInfo.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/CommonAssemblyInfo.cs Sat Mar  8 02:49:55 2014
@@ -13,7 +13,7 @@ using System.Runtime.InteropServices;
 //------------------------------------------------------------------------------
 
 [assembly: ComVisibleAttribute(false)]
-[assembly: CLSCompliantAttribute(true)]
+[assembly: CLSCompliantAttribute(false)]
 [assembly: AssemblyTitleAttribute("Apache NMS for ZMQ Class Library")]
 [assembly: AssemblyDescriptionAttribute("Apache NMS for ZMQ Class Library (.Net Messaging Library Implementation): An imp" +
     "lementation of the NMS API for ZMQ")]

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Connection.cs?rev=1575473&r1=1575472&r2=1575473&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Connection.cs Sat Mar  8 02:49:55 2014
@@ -17,6 +17,8 @@
 
 using System;
 using ZeroMQ;
+using System.Collections.Generic;
+using System.Text;
 
 namespace Apache.NMS.ZMQ
 {
@@ -26,19 +28,43 @@ namespace Apache.NMS.ZMQ
     ///
     public class Connection : IConnection
     {
+		private class ProducerRef
+		{
+			public ZmqSocket producer = null;
+			public int refCount = 1;
+		}
+
         private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
         private IRedeliveryPolicy redeliveryPolicy;
         private ConnectionMetaData metaData = null;
         private bool closed = true;
         private string clientId;
         private Uri brokerUri;
+		private string producerContextBinding;
+		private string consumerContextBinding;
 
         /// <summary>
         /// ZMQ context
         /// </summary>
-		private ZmqContext _context = ZmqContext.Create();
+		private static ZmqContext _context;
+		private static Dictionary<string, ProducerRef> producerCache;
+		private static object producerCacheLock;
+
+		static Connection()
+		{
+			Connection._context = ZmqContext.Create();
+			Connection.producerCache = new Dictionary<string, ProducerRef>();
+			Connection.producerCacheLock = new object();
+		}
+
+		public Connection(Uri connectionUri)
+		{
+			this.brokerUri = connectionUri;
+			this.producerContextBinding = string.Format("{0}://*:{1}", this.brokerUri.Scheme, this.brokerUri.Port);
+			this.consumerContextBinding = string.Format("{0}://{1}:{2}", brokerUri.Scheme, brokerUri.Host, this.brokerUri.Port);
+		}
 
-        /// <summary>
+		/// <summary>
         /// Starts message delivery for this connection.
         /// </summary>
         public void Start()
@@ -79,7 +105,83 @@ namespace Apache.NMS.ZMQ
             return new Session(this, mode);
         }
 
-        public void Dispose()
+		internal ZmqSocket GetProducer()
+		{
+			ProducerRef producerRef;
+			string contextBinding = GetProducerContextBinding();
+
+			lock(producerCacheLock)
+			{
+				if(!producerCache.TryGetValue(contextBinding, out producerRef))
+				{
+					producerRef = new ProducerRef();
+					producerRef.producer = this.Context.CreateSocket(SocketType.PUB);
+					if(null == producerRef.producer)
+					{
+						throw new ResourceAllocationException();
+					}
+					producerRef.producer.Bind(contextBinding);
+					producerCache.Add(contextBinding, producerRef);
+				}
+				else
+				{
+					producerRef.refCount++;
+				}
+			}
+
+			return producerRef.producer;
+		}
+
+		internal void ReleaseProducer(ZmqSocket endpoint)
+		{
+			// UNREFERENCED_PARAM(endpoint);
+			ProducerRef producerRef;
+			string contextBinding = GetProducerContextBinding();
+
+			lock(producerCacheLock)
+			{
+				if(producerCache.TryGetValue(contextBinding, out producerRef))
+				{
+					producerRef.refCount--;
+					if(producerRef.refCount < 1)
+					{
+						producerCache.Remove(contextBinding);
+						producerRef.producer.Unbind(contextBinding);
+					}
+				}
+			}
+		}
+
+		internal ZmqSocket GetConsumer(Encoding encoding, string destinationName)
+		{
+			ZmqSocket endpoint = this.Context.CreateSocket(SocketType.SUB);
+
+			if(null == endpoint)
+			{
+				throw new ResourceAllocationException();
+			}
+			endpoint.Subscribe(encoding.GetBytes(destinationName));
+			endpoint.Connect(GetConsumerBindingPath());
+
+			return endpoint;
+		}
+
+		internal void ReleaseConsumer(ZmqSocket endpoint)
+		{
+			endpoint.Disconnect(GetConsumerBindingPath());
+		}
+
+		internal string GetProducerContextBinding()
+		{
+			return this.producerContextBinding;
+		}
+
+		private string GetConsumerBindingPath()
+		{
+			return this.consumerContextBinding;
+		}
+
+		public void Dispose()
         {
             Close();
         }
@@ -87,7 +189,17 @@ namespace Apache.NMS.ZMQ
         public void Close()
         {
             Stop();
-        }
+
+			lock(producerCacheLock)
+			{
+				foreach(KeyValuePair<string, ProducerRef> cacheItem in producerCache)
+				{
+					cacheItem.Value.producer.Unbind(cacheItem.Key);
+				}
+
+				producerCache.Clear();
+			}
+		}
 
         public void PurgeTempDestinations()
         {
@@ -114,7 +226,6 @@ namespace Apache.NMS.ZMQ
         public Uri BrokerUri
         {
             get { return brokerUri; }
-            set { brokerUri = value; }
         }
 
         /// <summary>
@@ -154,7 +265,7 @@ namespace Apache.NMS.ZMQ
         /// </summary>
         internal ZmqContext Context
         {
-            get { return _context; }
+            get { return Connection._context; }
         }
 
         /// <summary>

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/ConnectionFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/ConnectionFactory.cs?rev=1575473&r1=1575472&r2=1575473&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/ConnectionFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/ConnectionFactory.cs Sat Mar  8 02:49:55 2014
@@ -16,6 +16,7 @@
  */
 using System;
 using Apache.NMS.Policies;
+using Apache.NMS.Util;
 
 namespace Apache.NMS.ZMQ
 {
@@ -25,7 +26,7 @@ namespace Apache.NMS.ZMQ
 	public class ConnectionFactory : IConnectionFactory
 	{
 		private Uri brokerUri;
-		private string clientID;
+		private string clientId;
 		private IRedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
 
 		private const string DEFAULT_BROKER_URL = "tcp://localhost:5556";
@@ -36,20 +37,35 @@ namespace Apache.NMS.ZMQ
 		{
 		}
 
-		public ConnectionFactory(string brokerUri)
-			: this(brokerUri, null)
+		public ConnectionFactory(string rawBrokerUri)
+			: this(rawBrokerUri, null)
 		{
 		}
 
-		public ConnectionFactory(string brokerUri, string clientID)
-			: this(new Uri(brokerUri), clientID)
+		public ConnectionFactory(string rawBrokerUri, string clientID)
+			: this(URISupport.CreateCompatibleUri(rawBrokerUri), clientID)
 		{
 		}
 
-		public ConnectionFactory(Uri brokerUri, string clientID)
+		public ConnectionFactory(Uri rawBrokerUri)
+			: this(rawBrokerUri, null)
 		{
-			this.brokerUri = brokerUri;
-			this.clientID = clientID;
+		}
+
+		public ConnectionFactory(Uri rawBrokerUri, string clientID)
+		{
+			this.BrokerUri = rawBrokerUri;
+			if(this.BrokerUri.Port < 1)
+			{
+				throw new NMSConnectionException("Missing connection port number.");
+			}
+
+			if(null == clientID)
+			{
+				clientID = Guid.NewGuid().ToString();
+			}
+
+			this.ClientId = clientID;
 		}
 
 		/// <summary>
@@ -93,17 +109,13 @@ namespace Apache.NMS.ZMQ
 		/// </summary>
 		public IConnection CreateConnection(string userName, string password, bool useLogging)
 		{
-			IConnection ReturnValue = null;
-			Connection connection = new Connection();
+			Connection connection = new Connection(this.BrokerUri);
 
 			connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as IRedeliveryPolicy;
 			connection.ConsumerTransformer = this.consumerTransformer;
 			connection.ProducerTransformer = this.producerTransformer;
-			connection.BrokerUri = this.BrokerUri;
-			connection.ClientId = this.clientID;
-			ReturnValue = connection;
-
-			return ReturnValue;
+			connection.ClientId = this.ClientId;
+			return connection;
 		}
 
 		/// <summary>
@@ -111,8 +123,18 @@ namespace Apache.NMS.ZMQ
 		/// </summary>
 		public Uri BrokerUri
 		{
-			get { return brokerUri; }
-			set { brokerUri = value; }
+			get { return this.brokerUri; }
+			set
+			{
+				Tracer.InfoFormat("BrokerUri set {0}", value.OriginalString);
+				this.brokerUri = new Uri(URISupport.StripPrefix(value.OriginalString, "zmq:"));
+			}
+		}
+
+		public string ClientId
+		{
+			get { return this.clientId; }
+			set { this.clientId = value; }
 		}
 
 		/// <summary>

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs?rev=1575473&r1=1575472&r2=1575473&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs Sat Mar  8 02:49:55 2014
@@ -16,6 +16,8 @@
  */
 
 using System;
+using System.Text;
+using ZeroMQ;
 
 namespace Apache.NMS.ZMQ
 {
@@ -24,40 +26,45 @@ namespace Apache.NMS.ZMQ
 	/// </summary>
 	public abstract class Destination : IDestination
 	{
-
-		private String name = "";
-
+		protected Session session;
 		/// <summary>
-		/// The Default Constructor
+		/// Socket object
 		/// </summary>
-		protected Destination()
-		{
-		}
+		protected ZmqSocket producerEndpoint = null;
+		protected ZmqSocket consumerEndpoint = null;
+		protected string destinationName;
 
 		/// <summary>
 		/// Construct the Destination with a defined physical name.
 		/// </summary>
 		/// <param name="name"></param>
-		protected Destination(String destName)
+		protected Destination(Session session, string destName)
 		{
-			Name = destName;
+			this.session = session;
+			this.destinationName = destName;
 		}
 
-		public String Name
+		~Destination()
 		{
-			get { return this.name; }
-			set
+			// TODO: Implement IDisposable pattern
+			if(null != this.producerEndpoint)
 			{
-				this.name = value;
-				if(!this.name.Contains("\\"))
-				{
-					// Destinations must have paths in them.  If no path specified, then
-					// default to local machine.
-					this.name = ".\\" + this.name;
-				}
+				this.session.Connection.ReleaseProducer(this.producerEndpoint);
+				this.producerEndpoint = null;
+			}
+
+			if(null != this.consumerEndpoint)
+			{
+				this.session.Connection.ReleaseConsumer(this.consumerEndpoint);
+				this.consumerEndpoint = null;
 			}
 		}
 
+		public string Name
+		{
+			get { return this.destinationName; }
+		}
+
 		public bool IsTopic
 		{
 			get
@@ -88,21 +95,26 @@ namespace Apache.NMS.ZMQ
 		/// <summary>
 		/// </summary>
 		/// <returns>string representation of this instance</returns>
-		public override String ToString()
+		public override string ToString()
+		{
+			return MakeUriString(this.destinationName);
+		}
+
+		private string MakeUriString(string destName)
 		{
 			switch(DestinationType)
 			{
 			case DestinationType.Topic:
-				return "topic://" + Name;
+				return "topic://" + destName;
 
 			case DestinationType.TemporaryTopic:
-				return "temp-topic://" + Name;
+				return "temp-topic://" + destName;
 
 			case DestinationType.TemporaryQueue:
-				return "temp-queue://" + Name;
+				return "temp-queue://" + destName;
 
 			default:
-				return "queue://" + Name;
+				return "queue://" + destName;
 			}
 		}
 
@@ -114,10 +126,7 @@ namespace Apache.NMS.ZMQ
 		{
 			int answer = 37;
 
-			if(this.name != null)
-			{
-				answer = name.GetHashCode();
-			}
+			answer = this.Name.GetHashCode();
 
 			if(IsTopic)
 			{
@@ -140,7 +149,7 @@ namespace Apache.NMS.ZMQ
 			{
 				Destination other = (Destination) obj;
 				result = (this.DestinationType == other.DestinationType
-							&& this.name.Equals(other.name));
+							&& this.Name.Equals(other.Name));
 			}
 
 			return result;
@@ -150,6 +159,38 @@ namespace Apache.NMS.ZMQ
 		{
 			get;
 		}
+
+		internal int Send(byte[] buffer, TimeSpan timeout)
+		{
+			if(null == this.producerEndpoint)
+			{
+				this.producerEndpoint = this.session.Connection.GetProducer();
+			}
+
+			return this.producerEndpoint.Send(buffer, buffer.Length, SocketFlags.None, timeout);
+		}
+
+		internal string Receive(Encoding encoding, TimeSpan timeout)
+		{
+			if(null == this.consumerEndpoint)
+			{
+				this.consumerEndpoint = this.session.Connection.GetConsumer(encoding, this.destinationName);
+			}
+
+			return consumerEndpoint.Receive(encoding, timeout);
+		}
+
+		internal Frame ReceiveFrame()
+		{
+			// TODO: Implement
+			return null;
+		}
+
+		internal ZmqMessage ReceiveMessage()
+		{
+			// TODO: Implement
+			return null;
+		}
 	}
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=1575473&r1=1575472&r2=1575473&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs Sat Mar  8 02:49:55 2014
@@ -15,13 +15,14 @@
  * limitations under the License.
  */
 
+#define PUBSUB
+
 using System;
 using System.Text;
 using System.Threading;
 using Apache.NMS.Util;
 using ZeroMQ;
-//using ZSendRecvOpt = ZMQ.SendRecvOpt;
-//using ZSocketType = ZeroMQ.SocketType;
+using System.Diagnostics;
 
 namespace Apache.NMS.ZMQ
 {
@@ -34,20 +35,12 @@ namespace Apache.NMS.ZMQ
 
 		private readonly Session session;
 		private readonly AcknowledgementMode acknowledgementMode;
-		/// <summary>
-		/// Socket object
-		/// </summary>
-		private ZmqSocket messageSubscriber = null;
-		/// <summary>
-		/// Context binding string
-		/// </summary>
-		private string contextBinding;
-		private Queue destination;
+		private Destination destination;
 		private event MessageListener listener;
 		private int listenerCount = 0;
 		private Thread asyncDeliveryThread = null;
-		private AutoResetEvent pause = new AutoResetEvent(false);
-		private Atomic<bool> asyncDelivery = new Atomic<bool>(false);
+		private object asyncDeliveryLock = new object();
+		private bool asyncDelivery = false;
 
 		private ConsumerTransformerDelegate consumerTransformer;
 		public ConsumerTransformerDelegate ConsumerTransformer
@@ -56,39 +49,18 @@ namespace Apache.NMS.ZMQ
 			set { this.consumerTransformer = value; }
 		}
 
-		public MessageConsumer(Session session, AcknowledgementMode acknowledgementMode, IDestination destination, string selector)
+		public MessageConsumer(Session session, AcknowledgementMode acknowledgementMode, IDestination dest, string selector)
 		{
+			// UNUSED_PARAM(selector);		// Selectors are not currently supported
+
 			if(null == session.Connection.Context)
 			{
 				throw new NMSConnectionException();
 			}
 
 			this.session = session;
+			this.destination = (Destination) dest;
 			this.acknowledgementMode = acknowledgementMode;
-			this.messageSubscriber = session.Connection.Context.CreateSocket(SocketType.SUB);
-			if(null == this.messageSubscriber)
-			{
-				throw new ResourceAllocationException();
-			}
-
-			string clientId = session.Connection.ClientId;
-
-			this.contextBinding = session.Connection.BrokerUri.LocalPath;
-			this.destination = new Queue(this.contextBinding);
-			if(!string.IsNullOrEmpty(clientId))
-			{
-				this.messageSubscriber.Identity = Encoding.Unicode.GetBytes(clientId);
-			}
-
-			this.messageSubscriber.Connect(contextBinding);
-			byte[] prefix = null;
-
-			if(!string.IsNullOrWhiteSpace(selector))
-			{
-				prefix = Encoding.ASCII.GetBytes(selector);
-			}
-
-			this.messageSubscriber.Subscribe(prefix);
 		}
 
 		public event MessageListener Listener
@@ -123,8 +95,7 @@ namespace Apache.NMS.ZMQ
 		/// </returns>
 		public IMessage Receive()
 		{
-			// TODO: Support decoding of all message types + all meta data (e.g., headers and properties)
-			return ToNmsMessage(messageSubscriber.Receive(Encoding.ASCII));
+			return Receive(TimeSpan.MaxValue);
 		}
 
 		/// <summary>
@@ -136,7 +107,17 @@ namespace Apache.NMS.ZMQ
 		public IMessage Receive(TimeSpan timeout)
 		{
 			// TODO: Support decoding of all message types + all meta data (e.g., headers and properties)
-			return ToNmsMessage(messageSubscriber.Receive(Encoding.ASCII, timeout));
+			string msgContent = this.destination.Receive(Encoding.UTF8, timeout);
+
+			if(null != msgContent)
+			{
+				// Strip off the subscribed destination name.
+				string destinationName = this.destination.Name;
+				string messageText = msgContent.Substring(destinationName.Length, msgContent.Length - destinationName.Length);
+				return ToNmsMessage(messageText);
+			}
+
+			return null;
 		}
 
 		/// <summary>
@@ -164,21 +145,18 @@ namespace Apache.NMS.ZMQ
 		public void Close()
 		{
 			StopAsyncDelivery();
-			if(null != messageSubscriber)
-			{
-				messageSubscriber.Dispose();
-				messageSubscriber = null;
-			}
+			this.destination = null;
 		}
 
 		protected virtual void StopAsyncDelivery()
 		{
-			if(asyncDelivery.CompareAndSet(true, false))
+			lock(asyncDeliveryLock)
 			{
+				asyncDelivery = false;
 				if(null != asyncDeliveryThread)
 				{
 					Tracer.Info("Stopping async delivery thread.");
-					pause.Set();
+					asyncDeliveryThread.Interrupt();
 					if(!asyncDeliveryThread.Join(10000))
 					{
 						Tracer.Info("Aborting async delivery thread.");
@@ -193,10 +171,12 @@ namespace Apache.NMS.ZMQ
 
 		protected virtual void StartAsyncDelivery()
 		{
-			if(asyncDelivery.CompareAndSet(false, true))
+			Debug.Assert(null == asyncDeliveryThread);
+			lock(asyncDeliveryLock)
 			{
+				asyncDelivery = true;
 				asyncDeliveryThread = new Thread(new ThreadStart(DispatchLoop));
-				asyncDeliveryThread.Name = "Message Consumer Dispatch: " + contextBinding;
+				asyncDeliveryThread.Name = string.Format("MsgConsumerAsync: {0}", this.destination.Name);
 				asyncDeliveryThread.IsBackground = true;
 				asyncDeliveryThread.Start();
 			}
@@ -205,21 +185,22 @@ namespace Apache.NMS.ZMQ
 		protected virtual void DispatchLoop()
 		{
 			Tracer.Info("Starting dispatcher thread consumer: " + this);
+			TimeSpan receiveWait = TimeSpan.FromSeconds(3);
 
-			while(asyncDelivery.Value)
+			while(asyncDelivery)
 			{
 				try
 				{
-					IMessage message = Receive();
-					if(asyncDelivery.Value && message != null)
+					IMessage message = Receive(receiveWait);
+					if(asyncDelivery && message != null)
 					{
 						try
 						{
 							listener(message);
 						}
-						catch(Exception e)
+						catch(Exception ex)
 						{
-							HandleAsyncException(e);
+							HandleAsyncException(ex);
 						}
 					}
 				}
@@ -233,7 +214,7 @@ namespace Apache.NMS.ZMQ
 					Tracer.ErrorFormat("Exception while receiving message in thread: {0} : {1}", this, ex.Message);
 				}
 			}
-			Tracer.Info("Stopping dispatcher thread consumer: " + this);
+			Tracer.Info("Stopped dispatcher thread consumer: " + this);
 		}
 
 		protected virtual void HandleAsyncException(Exception e)
@@ -252,6 +233,7 @@ namespace Apache.NMS.ZMQ
 		/// </returns>
 		protected virtual IMessage ToNmsMessage(string messageText)
 		{
+			// Strip off the destination name prefix.
 			IMessage nmsMessage = new TextMessage(messageText);
 
 			try

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs?rev=1575473&r1=1575472&r2=1575473&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs Sat Mar  8 02:49:55 2014
@@ -15,10 +15,13 @@
  * limitations under the License.
  */
 
+#define PUBSUB
+
 using System;
 using System.Text;
 using ZeroMQ;
 
+
 namespace Apache.NMS.ZMQ
 {
 	/// <summary>
@@ -29,10 +32,6 @@ namespace Apache.NMS.ZMQ
 		private readonly Session session;
 		private IDestination destination;
 
-		/// <summary>
-		/// Socket object
-		/// </summary>
-		private ZmqSocket messageProducer = null;
 		private MsgDeliveryMode deliveryMode;
 		private TimeSpan timeToLive;
 		private MsgPriority priority;
@@ -46,24 +45,15 @@ namespace Apache.NMS.ZMQ
 			set { this.producerTransformer = value; }
 		}
 
-		public MessageProducer(Connection connection, Session session, IDestination destination)
+		public MessageProducer(Session session, IDestination dest)
 		{
-			if(null == connection.Context)
+			if(null == session.Connection.Context)
 			{
 				throw new NMSConnectionException();
 			}
 
 			this.session = session;
-			this.destination = destination;
-			this.messageProducer = connection.Context.CreateSocket(SocketType.SUB);
-
-			string clientId = connection.ClientId;
-			if(!string.IsNullOrEmpty(clientId))
-			{
-				this.messageProducer.Identity = Encoding.Unicode.GetBytes(clientId);
-			}
-
-			this.messageProducer.Connect(connection.BrokerUri.LocalPath);
+			this.destination = dest;
 		}
 
 		public void Send(IMessage message)
@@ -76,13 +66,17 @@ namespace Apache.NMS.ZMQ
 			Send(Destination, message, deliveryMode, priority, timeToLive);
 		}
 
-		public void Send(IDestination destination, IMessage message)
+		public void Send(IDestination dest, IMessage message)
 		{
-			Send(destination, message, DeliveryMode, Priority, TimeToLive);
+			Send(dest, message, DeliveryMode, Priority, TimeToLive);
 		}
 
-		public void Send(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
+		public void Send(IDestination dest, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
 		{
+			// UNUSED_PARAM(deliveryMode);	// No concept of different delivery modes in ZMQ
+			// UNUSED_PARAM(priority);		// No concept of priority messages in ZMQ
+			// UNUSED_PARAM(timeToLive);	// No concept of time-to-live in ZMQ
+
 			if(null != this.ProducerTransformer)
 			{
 				IMessage transformedMessage = ProducerTransformer(this.session, this, message);
@@ -94,7 +88,13 @@ namespace Apache.NMS.ZMQ
 			}
 
 			// TODO: Support encoding of all message types + all meta data (e.g., headers and properties)
-			messageProducer.Send(((ITextMessage) message).Text, Encoding.ASCII);
+
+			// Prefix the message with the destination name. The client will subscribe to this destination name
+			// in order to receive messages.
+			Destination destination = (Destination) dest;
+
+			string msg = destination.Name + ((ITextMessage) message).Text;
+			destination.Send(Encoding.UTF8.GetBytes(msg), this.session.Connection.RequestTimeout);
 		}
 
 		public void Dispose()
@@ -104,11 +104,6 @@ namespace Apache.NMS.ZMQ
 
 		public void Close()
 		{
-			if(null != messageProducer)
-			{
-				messageProducer.Dispose();
-				messageProducer = null;
-			}
 		}
 
 		public IMessage CreateMessage()

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Queue.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Queue.cs?rev=1575473&r1=1575472&r2=1575473&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Queue.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Queue.cs Sat Mar  8 02:49:55 2014
@@ -24,13 +24,8 @@ namespace Apache.NMS.ZMQ
 	/// </summary>
 	public class Queue : Destination, IQueue
 	{
-		public Queue()
-			: base()
-		{
-		}
-
-		public Queue(String name)
-			: base(name)
+		public Queue(Session session, string name)
+			: base(session, name)
 		{
 		}
 

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Session.cs?rev=1575473&r1=1575472&r2=1575473&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Session.cs Sat Mar  8 02:49:55 2014
@@ -61,7 +61,7 @@ namespace Apache.NMS.ZMQ
 
         public IMessageProducer CreateProducer(IDestination destination)
         {
-            return new MessageProducer(connection, this, destination);
+            return new MessageProducer(this, destination);
         }
         #endregion
 
@@ -106,22 +106,22 @@ namespace Apache.NMS.ZMQ
 
         public IQueue GetQueue(string name)
         {
-            return new Queue(name);
+            return new Queue(this, name);
         }
 
         public ITopic GetTopic(string name)
         {
-            return new Topic(name);
+			return new Topic(this, name);
         }
 
         public ITemporaryQueue CreateTemporaryQueue()
         {
-            return new TemporaryQueue();
+            return new TemporaryQueue(this);
         }
 
         public ITemporaryTopic CreateTemporaryTopic()
         {
-            return new TemporaryTopic();
+            return new TemporaryTopic(this);
         }
 
         /// <summary>

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryQueue.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryQueue.cs?rev=1575473&r1=1575472&r2=1575473&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryQueue.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryQueue.cs Sat Mar  8 02:49:55 2014
@@ -24,13 +24,8 @@ namespace Apache.NMS.ZMQ
 	/// </summary>
 	public class TemporaryQueue : Destination, ITemporaryQueue
 	{
-		public TemporaryQueue()
-			: base()
-		{
-		}
-
-		public TemporaryQueue(String name)
-			: base(name)
+		public TemporaryQueue(Session session)
+			: base(session, Guid.NewGuid().ToString())
 		{
 		}
 
@@ -39,20 +34,20 @@ namespace Apache.NMS.ZMQ
 			get { return DestinationType.TemporaryQueue; }
 		}
 
-		#region ITemporaryQueue Members
+		#region IQueue Members
 
-		public void Delete()
+		public string QueueName
 		{
-			// Nothing to delete.  Resources are cleaned up automatically.
+			get { return Name; }
 		}
 
 		#endregion
 
-		#region IQueue Members
+		#region ITemporaryQueue Members
 
-		public string QueueName
+		public void Delete()
 		{
-			get { return Name; }
+			// Nothing to delete.  Resources are cleaned up automatically.
 		}
 
 		#endregion

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryTopic.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryTopic.cs?rev=1575473&r1=1575472&r2=1575473&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryTopic.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryTopic.cs Sat Mar  8 02:49:55 2014
@@ -24,13 +24,8 @@ namespace Apache.NMS.ZMQ
 	/// </summary>
 	public class TemporaryTopic : Destination, ITemporaryTopic
 	{
-		public TemporaryTopic()
-			: base()
-		{
-		}
-
-		public TemporaryTopic(String name)
-			: base(name)
+		public TemporaryTopic(Session session)
+			: base(session, Guid.NewGuid().ToString())
 		{
 		}
 

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TextMessage.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TextMessage.cs?rev=1575473&r1=1575472&r2=1575473&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TextMessage.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TextMessage.cs Sat Mar  8 02:49:55 2014
@@ -16,13 +16,12 @@
  */
 
 using System;
+using System.Text;
 
 namespace Apache.NMS.ZMQ
 {
 	public class TextMessage : BaseMessage, ITextMessage
 	{
-		public const int SIZE_OF_INT = 4; // sizeof(int) - though causes unsafe issues with net 1.1
-
 		private String text;
 
 		public TextMessage()
@@ -34,68 +33,13 @@ namespace Apache.NMS.ZMQ
 			this.Text = text;
 		}
 
-
 		// Properties
 
 		public string Text
 		{
-			get
-			{
-				if(text == null)
-				{
-					// now lets read the content
-					byte[] data = this.Content;
-					if(data != null)
-					{
-						// TODO assume that the text is ASCII
-						char[] chars = new char[data.Length - SIZE_OF_INT];
-						for(int i = 0; i < chars.Length; i++)
-						{
-							chars[i] = (char) data[i + SIZE_OF_INT];
-						}
-						text = new String(chars);
-					}
-				}
-				return text;
-			}
-
-			set
-			{
-				this.text = value;
-				byte[] data = null;
-				if(text != null)
-				{
-					// TODO assume that the text is ASCII
-
-					byte[] sizePrefix = System.BitConverter.GetBytes(text.Length);
-					data = new byte[text.Length + sizePrefix.Length];  //int at the front of it
-
-					// add the size prefix
-					for(int j = 0; j < sizePrefix.Length; j++)
-					{
-						// The bytes need to be encoded in big endian
-						if(BitConverter.IsLittleEndian)
-						{
-							data[j] = sizePrefix[sizePrefix.Length - j - 1];
-						}
-						else
-						{
-							data[j] = sizePrefix[j];
-						}
-					}
-
-					// Add the data.
-					char[] chars = text.ToCharArray();
-					for(int i = 0; i < chars.Length; i++)
-					{
-						data[i + sizePrefix.Length] = (byte) chars[i];
-					}
-				}
-				this.Content = data;
-
-			}
+			get { return text; }
+			set { this.text = value; }
 		}
-
 	}
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Topic.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Topic.cs?rev=1575473&r1=1575472&r2=1575473&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Topic.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Topic.cs Sat Mar  8 02:49:55 2014
@@ -24,13 +24,8 @@ namespace Apache.NMS.ZMQ
 	/// </summary>
 	public class Topic : Destination, ITopic
 	{
-		public Topic()
-			: base()
-		{
-		}
-
-		public Topic(String name)
-			: base(name)
+		public Topic(Session session, String name)
+			: base(session, name)
 		{
 		}
 

Added: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Utils.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Utils.cs?rev=1575473&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Utils.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Utils.cs Sat Mar  8 02:49:55 2014
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+
+namespace Apache.NMS.ZMQ
+{
+	public class ZMQUtils
+	{
+
+		public static string GetDestinationName(IDestination destination)
+		{
+			switch(destination.DestinationType)
+			{
+			case DestinationType.Topic: return ((Topic) destination).TopicName;
+			case DestinationType.Queue: return ((Queue) destination).QueueName;
+			case DestinationType.TemporaryTopic: return ((TemporaryTopic) destination).TopicName;
+			case DestinationType.TemporaryQueue: return ((TemporaryQueue) destination).QueueName;
+			default: return string.Empty;
+			}
+		}
+	}
+}

Copied: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/BaseTest.cs (from r1573636, activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs)
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/BaseTest.cs?p2=activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/BaseTest.cs&p1=activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs&r1=1573636&r2=1575473&rev=1575473&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/BaseTest.cs Sat Mar  8 02:49:55 2014
@@ -25,30 +25,15 @@ namespace Apache.NMS.ZMQ
 	/// <summary>
 	/// Use to test and verify ZMQ behavior
 	/// </summary>
-	[TestFixture]
-	public class ZMQTest
+	public class BaseTest
 	{
-		private bool receivedTestMessage = true;
-
-		[SetUp]
-		public void SetUp()
-		{
-			// Setup before each test
-		}
-
-		[TearDown]
-		public void TearDown()
-		{
-			// Clean up after each test
-		}
-
-		[Test]
-		public void TestReceive()
+		[TestFixtureSetUp]
+		public void TestFixtureSetup()
 		{
 			////////////////////////////
 			// Dependencies check
 			////////////////////////////
-			string libFolder = System.Environment.CurrentDirectory;
+			string libFolder = Environment.CurrentDirectory;
 			string libFileName;
 
 			libFileName = Path.Combine(libFolder, "clrzmq.dll");
@@ -61,82 +46,18 @@ namespace Apache.NMS.ZMQ
 			Assert.IsTrue(File.Exists(libFileName), "Missing Apache.NMS library file: {0}", libFileName);
 			libFileName = Path.Combine(libFolder, "Apache.NMS.ZMQ.dll");
 			Assert.IsTrue(File.Exists(libFileName), "Missing Apache.NMS.ZMQ library file: {0}", libFileName);
+		}
 
-			////////////////////////////
-			// Factory check
-			////////////////////////////
-			IConnectionFactory factory = new ConnectionFactory("tcp://localhost:5556", "");
-			Assert.IsNotNull(factory, "Error creating connection factory.");
-
-			////////////////////////////
-			// Connection check
-			////////////////////////////
-			IConnection connection = null;
-			try
-			{
-				connection = factory.CreateConnection();
-				Assert.IsNotNull(connection, "problem creating connection class, usually problem with libzmq and clrzmq ");
-			}
-			catch(System.Exception ex1)
-			{
-				Assert.Fail("Problem creating connection, make sure dependencies are present. Error: {0}", ex1.Message);
-			}
-
-			////////////////////////////
-			// Session check
-			////////////////////////////
-			ISession session = connection.CreateSession();
-			// Is session good?
-			Assert.IsNotNull(session, "Error creating Session.");
-
-			////////////////////////////
-			// Consumer check
-			////////////////////////////
-			IQueue testQueue = session.GetQueue("ZMQTestQueue");
-			Assert.IsNotNull(testQueue, "Error creating test queue.");
-			IMessageConsumer consumer = session.CreateConsumer(testQueue);
-			Assert.IsNotNull(consumer, "Error creating consumer.");
-
-			consumer.Listener += OnMessage;
-
-			////////////////////////////
-			// Producer check
-			////////////////////////////
-			IMessageProducer producer = session.CreateProducer(testQueue);
-			Assert.IsNotNull(consumer, "Error creating producer.");
-
-			ITextMessage testMsg = producer.CreateTextMessage("Zero Message.");
-			Assert.IsNotNull(testMsg, "Error creating test message.");
-
-			producer.Send(testMsg);
-
-			////////////////////////////
-			// Listener check
-			////////////////////////////
-			DateTime startWaitTime = DateTime.Now;
-			TimeSpan maxWaitTime = TimeSpan.FromSeconds(10);
-
-			while(!receivedTestMessage)
-			{
-				if((DateTime.Now - startWaitTime) > maxWaitTime)
-				{
-					Assert.Fail("Timeout waiting for message receive.");
-				}
-
-				Thread.Sleep(5);
-			}
+		[SetUp]
+		public void SetUp()
+		{
+			// Setup before each test
 		}
 
-		/// <summary>
-		/// Receive messages sent to consumer.
-		/// </summary>
-		/// <param name="message"></param>
-		private void OnMessage(IMessage message)
+		[TearDown]
+		public void TearDown()
 		{
-			Assert.IsInstanceOf<ITextMessage>(message, "Wrong message type received.");
-			ITextMessage textMsg = (ITextMessage) message;
-			Assert.AreEqual(textMsg.Text, "Zero Message.");
-			receivedTestMessage = true;
+			// Clean up after each test
 		}
 	}
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/CommonAssemblyInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/CommonAssemblyInfo.cs?rev=1575473&r1=1575472&r2=1575473&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/CommonAssemblyInfo.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/CommonAssemblyInfo.cs Sat Mar  8 02:49:55 2014
@@ -13,7 +13,7 @@ using System.Runtime.InteropServices;
 //------------------------------------------------------------------------------
 
 [assembly: ComVisibleAttribute(false)]
-[assembly: CLSCompliantAttribute(true)]
+[assembly: CLSCompliantAttribute(false)]
 [assembly: AssemblyTitleAttribute("Apache NMS for ZMQ Class Library")]
 [assembly: AssemblyDescriptionAttribute("Apache NMS for ZMQ Class Library (.Net Messaging Library Implementation): An imp" +
     "lementation of the NMS API for ZMQ")]

Copied: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/FactoryTests.cs (from r1573636, activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs)
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/FactoryTests.cs?p2=activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/FactoryTests.cs&p1=activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs&r1=1573636&r2=1575473&rev=1575473&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/FactoryTests.cs Sat Mar  8 02:49:55 2014
@@ -16,130 +16,81 @@
  */
 
 using System;
-using System.IO;
-using System.Threading;
 using NUnit.Framework;
 
 namespace Apache.NMS.ZMQ
 {
-	/// <summary>
-	/// Use to test and verify ZMQ behavior
-	/// </summary>
 	[TestFixture]
-	public class ZMQTest
+	public class FactoryTests : BaseTest
 	{
-		private bool receivedTestMessage = true;
+		[Test]
+		public void TestFactory()
+		{
+			IConnectionFactory factory = NMSConnectionFactory.CreateConnectionFactory(new Uri("zmq:tcp://localhost:5556"));
+			Assert.IsNotNull(factory, "Error creating connection factory.");
+			Assert.IsInstanceOf<ConnectionFactory>(factory, "Wrong factory type.");
+			Assert.AreEqual(factory.BrokerUri.Port, 5556, "Wrong port.");
+		}
 
-		[SetUp]
-		public void SetUp()
+		[Test]
+		public void TestFactoryClientId()
 		{
-			// Setup before each test
+			IConnectionFactory factory = NMSConnectionFactory.CreateConnectionFactory(new Uri("zmq:tcp://localhost:5556"), "MyClientId");
+			Assert.IsNotNull(factory, "Error creating connection factory.");
+			Assert.IsInstanceOf<ConnectionFactory>(factory, "Wrong factory type.");
+			Assert.AreEqual(factory.BrokerUri.Port, 5556, "Wrong port.");
+			ConnectionFactory zmqConnectionFactory = (ConnectionFactory) factory;
+			Assert.AreEqual(zmqConnectionFactory.ClientId, "MyClientId", "Wrong client Id.");
 		}
 
-		[TearDown]
-		public void TearDown()
-		{
-			// Clean up after each test
-		}
-
-		[Test]
-		public void TestReceive()
-		{
-			////////////////////////////
-			// Dependencies check
-			////////////////////////////
-			string libFolder = System.Environment.CurrentDirectory;
-			string libFileName;
-
-			libFileName = Path.Combine(libFolder, "clrzmq.dll");
-			Assert.IsTrue(File.Exists(libFileName), "Missing zmq wrapper file: {0}", libFileName);
-			libFileName = Path.Combine(libFolder, "libzmq.dll");
-			Assert.IsTrue(File.Exists(libFileName), "Missing zmq library file: {0}", libFileName);
-			libFileName = Path.Combine(libFolder, "libzmq64.dll");
-			Assert.IsTrue(File.Exists(libFileName), "Missing 64-bit zmq library file: {0}", libFileName);
-			libFileName = Path.Combine(libFolder, "Apache.NMS.dll");
-			Assert.IsTrue(File.Exists(libFileName), "Missing Apache.NMS library file: {0}", libFileName);
-			libFileName = Path.Combine(libFolder, "Apache.NMS.ZMQ.dll");
-			Assert.IsTrue(File.Exists(libFileName), "Missing Apache.NMS.ZMQ library file: {0}", libFileName);
-
-			////////////////////////////
-			// Factory check
-			////////////////////////////
-			IConnectionFactory factory = new ConnectionFactory("tcp://localhost:5556", "");
-			Assert.IsNotNull(factory, "Error creating connection factory.");
-
-			////////////////////////////
-			// Connection check
-			////////////////////////////
-			IConnection connection = null;
-			try
-			{
-				connection = factory.CreateConnection();
-				Assert.IsNotNull(connection, "problem creating connection class, usually problem with libzmq and clrzmq ");
-			}
-			catch(System.Exception ex1)
-			{
-				Assert.Fail("Problem creating connection, make sure dependencies are present. Error: {0}", ex1.Message);
-			}
-
-			////////////////////////////
-			// Session check
-			////////////////////////////
-			ISession session = connection.CreateSession();
-			// Is session good?
-			Assert.IsNotNull(session, "Error creating Session.");
-
-			////////////////////////////
-			// Consumer check
-			////////////////////////////
-			IQueue testQueue = session.GetQueue("ZMQTestQueue");
-			Assert.IsNotNull(testQueue, "Error creating test queue.");
-			IMessageConsumer consumer = session.CreateConsumer(testQueue);
-			Assert.IsNotNull(consumer, "Error creating consumer.");
-
-			consumer.Listener += OnMessage;
-
-			////////////////////////////
-			// Producer check
-			////////////////////////////
-			IMessageProducer producer = session.CreateProducer(testQueue);
-			Assert.IsNotNull(consumer, "Error creating producer.");
-
-			ITextMessage testMsg = producer.CreateTextMessage("Zero Message.");
-			Assert.IsNotNull(testMsg, "Error creating test message.");
-
-			producer.Send(testMsg);
-
-			////////////////////////////
-			// Listener check
-			////////////////////////////
-			DateTime startWaitTime = DateTime.Now;
-			TimeSpan maxWaitTime = TimeSpan.FromSeconds(10);
-
-			while(!receivedTestMessage)
-			{
-				if((DateTime.Now - startWaitTime) > maxWaitTime)
-				{
-					Assert.Fail("Timeout waiting for message receive.");
-				}
-
-				Thread.Sleep(5);
-			}
-		}
-
-		/// <summary>
-		/// Receive messages sent to consumer.
-		/// </summary>
-		/// <param name="message"></param>
-		private void OnMessage(IMessage message)
-		{
-			Assert.IsInstanceOf<ITextMessage>(message, "Wrong message type received.");
-			ITextMessage textMsg = (ITextMessage) message;
-			Assert.AreEqual(textMsg.Text, "Zero Message.");
-			receivedTestMessage = true;
+		[Test, ExpectedException(typeof(NMSConnectionException))]
+		public void TestFactoryUriMissingPort()
+		{
+			IConnectionFactory factory = NMSConnectionFactory.CreateConnectionFactory(new Uri("zmq:tcp://localhost"));
 		}
-	}
-}
 
+		[Test]
+		public void TestFactoryDefault()
+		{
+			IConnectionFactory factory = new ConnectionFactory();
+			Assert.IsNotNull(factory, "Error creating connection factory.");
+			Assert.AreEqual(factory.BrokerUri.Port, 5556, "Wrong default port.");
+		}
 
+		[Test]
+		public void TestFactoryDirectString()
+		{
+			IConnectionFactory factory = new ConnectionFactory("tcp://localhost:5556");
+			Assert.IsNotNull(factory, "Error creating connection factory.");
+			Assert.AreEqual(factory.BrokerUri.Port, 5556, "Wrong port.");
+		}
 
+		[Test]
+		public void TestFactoryDirectStringClientId()
+		{
+			IConnectionFactory factory = new ConnectionFactory("tcp://localhost:5556", "DirectClientId");
+			Assert.IsNotNull(factory, "Error creating connection factory.");
+			Assert.AreEqual(factory.BrokerUri.Port, 5556, "Wrong port.");
+			ConnectionFactory zmqConnectionFactory = (ConnectionFactory) factory;
+			Assert.AreEqual(zmqConnectionFactory.ClientId, "DirectClientId", "Wrong client Id.");
+		}
+
+		[Test]
+		public void TestFactoryDirectUri()
+		{
+			IConnectionFactory factory = new ConnectionFactory(new Uri("tcp://localhost:5556"));
+			Assert.IsNotNull(factory, "Error creating connection factory.");
+			Assert.AreEqual(factory.BrokerUri.Port, 5556, "Wrong port.");
+		}
+
+		[Test]
+		public void TestFactoryDirectUriClientId()
+		{
+			IConnectionFactory factory = new ConnectionFactory(new Uri("tcp://localhost:5556"), "DirectClientId");
+			Assert.IsNotNull(factory, "Error creating connection factory.");
+			Assert.AreEqual(factory.BrokerUri.Port, 5556, "Wrong port.");
+			ConnectionFactory zmqConnectionFactory = (ConnectionFactory) factory;
+			Assert.AreEqual(zmqConnectionFactory.ClientId, "DirectClientId", "Wrong client Id.");
+		}
+	}
+}

Copied: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/MultiProducersMultiConsumers.cs (from r1573636, activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs)
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/MultiProducersMultiConsumers.cs?p2=activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/MultiProducersMultiConsumers.cs&p1=activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs&r1=1573636&r2=1575473&rev=1575473&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/MultiProducersMultiConsumers.cs Sat Mar  8 02:49:55 2014
@@ -16,127 +16,137 @@
  */
 
 using System;
-using System.IO;
 using System.Threading;
 using NUnit.Framework;
 
 namespace Apache.NMS.ZMQ
 {
-	/// <summary>
-	/// Use to test and verify ZMQ behavior
-	/// </summary>
 	[TestFixture]
-	public class ZMQTest
+	public class ProducerConsumers : BaseTest
 	{
-		private bool receivedTestMessage = true;
+		private int totalMsgCountToReceive = 0;
 
-		[SetUp]
-		public void SetUp()
+		private class ConsumerTracker
 		{
-			// Setup before each test
-		}
-
-		[TearDown]
-		public void TearDown()
-		{
-			// Clean up after each test
+			public IMessageConsumer consumer;
+			public int msgCount = 0;
 		}
 
 		[Test]
-		public void TestReceive()
+		public void TestMultipleProducersConsumer(
+			[Values("queue://ZMQTestQueue", "topic://ZMQTestTopic", "temp-queue://ZMQTempQueue", "temp-topic://ZMQTempTopic")]
+			string destination,
+			[Values(1, 3)]
+			int numProducers,
+			[Values(1, 3)]
+			int numConsumers)
 		{
-			////////////////////////////
-			// Dependencies check
-			////////////////////////////
-			string libFolder = System.Environment.CurrentDirectory;
-			string libFileName;
-
-			libFileName = Path.Combine(libFolder, "clrzmq.dll");
-			Assert.IsTrue(File.Exists(libFileName), "Missing zmq wrapper file: {0}", libFileName);
-			libFileName = Path.Combine(libFolder, "libzmq.dll");
-			Assert.IsTrue(File.Exists(libFileName), "Missing zmq library file: {0}", libFileName);
-			libFileName = Path.Combine(libFolder, "libzmq64.dll");
-			Assert.IsTrue(File.Exists(libFileName), "Missing 64-bit zmq library file: {0}", libFileName);
-			libFileName = Path.Combine(libFolder, "Apache.NMS.dll");
-			Assert.IsTrue(File.Exists(libFileName), "Missing Apache.NMS library file: {0}", libFileName);
-			libFileName = Path.Combine(libFolder, "Apache.NMS.ZMQ.dll");
-			Assert.IsTrue(File.Exists(libFileName), "Missing Apache.NMS.ZMQ library file: {0}", libFileName);
-
-			////////////////////////////
-			// Factory check
-			////////////////////////////
-			IConnectionFactory factory = new ConnectionFactory("tcp://localhost:5556", "");
+			IConnectionFactory factory = NMSConnectionFactory.CreateConnectionFactory(new Uri("zmq:tcp://localhost:5556"));
 			Assert.IsNotNull(factory, "Error creating connection factory.");
 
-			////////////////////////////
-			// Connection check
-			////////////////////////////
-			IConnection connection = null;
-			try
-			{
-				connection = factory.CreateConnection();
-				Assert.IsNotNull(connection, "problem creating connection class, usually problem with libzmq and clrzmq ");
-			}
-			catch(System.Exception ex1)
-			{
-				Assert.Fail("Problem creating connection, make sure dependencies are present. Error: {0}", ex1.Message);
-			}
-
-			////////////////////////////
-			// Session check
-			////////////////////////////
-			ISession session = connection.CreateSession();
-			// Is session good?
-			Assert.IsNotNull(session, "Error creating Session.");
-
-			////////////////////////////
-			// Consumer check
-			////////////////////////////
-			IQueue testQueue = session.GetQueue("ZMQTestQueue");
-			Assert.IsNotNull(testQueue, "Error creating test queue.");
-			IMessageConsumer consumer = session.CreateConsumer(testQueue);
-			Assert.IsNotNull(consumer, "Error creating consumer.");
-
-			consumer.Listener += OnMessage;
-
-			////////////////////////////
-			// Producer check
-			////////////////////////////
-			IMessageProducer producer = session.CreateProducer(testQueue);
-			Assert.IsNotNull(consumer, "Error creating producer.");
-
-			ITextMessage testMsg = producer.CreateTextMessage("Zero Message.");
-			Assert.IsNotNull(testMsg, "Error creating test message.");
-
-			producer.Send(testMsg);
-
-			////////////////////////////
-			// Listener check
-			////////////////////////////
-			DateTime startWaitTime = DateTime.Now;
-			TimeSpan maxWaitTime = TimeSpan.FromSeconds(10);
-
-			while(!receivedTestMessage)
+			using(IConnection connection = factory.CreateConnection())
 			{
-				if((DateTime.Now - startWaitTime) > maxWaitTime)
+				Assert.IsNotNull(connection, "Problem creating connection class. Usually problem with libzmq and clrzmq ");
+				using(ISession session = connection.CreateSession())
 				{
-					Assert.Fail("Timeout waiting for message receive.");
+					Assert.IsNotNull(session, "Error creating Session.");
+					IDestination testDestination = session.GetDestination(destination);
+					Assert.IsNotNull(testDestination, "Error creating test destination: {0}", destination);
+
+					// Track the number of messages we should receive
+					this.totalMsgCountToReceive = numProducers * numConsumers;
+
+					ConsumerTracker[] consumerTrackers = null;
+					IMessageProducer[] producers = null;
+
+					try
+					{
+						// Create the consumers
+						consumerTrackers = new ConsumerTracker[numConsumers];
+						for(int index = 0; index < numConsumers; index++)
+						{
+							ConsumerTracker tracker = new ConsumerTracker();
+							tracker.consumer = session.CreateConsumer(testDestination);
+							Assert.IsNotNull(tracker.consumer, "Error creating consumer #{0} on {1}", index, destination);
+							tracker.consumer.Listener += (message) =>
+								{
+									Assert.IsInstanceOf<TextMessage>(message, "Wrong message type received.");
+									ITextMessage textMsg = (ITextMessage) message;
+									Assert.AreEqual(textMsg.Text, "Zero Message.");
+									tracker.msgCount++;
+								};
+							consumerTrackers[index] = tracker;
+						}
+
+						// Create the producers
+						producers = new IMessageProducer[numProducers];
+						for(int index = 0; index < numProducers; index++)
+						{
+							producers[index] = session.CreateProducer(testDestination);
+							Assert.IsNotNull(producers[index], "Error creating producer #{0} on {1}", index, destination);
+						}
+
+						// Send the messages
+						for(int index = 0; index < numProducers; index++)
+						{
+							ITextMessage testMsg = producers[index].CreateTextMessage("Zero Message.");
+							Assert.IsNotNull(testMsg, "Error creating test message for producer #{0}.", index);
+							producers[index].Send(testMsg);
+						}
+
+						// Wait for the message
+						DateTime startWaitTime = DateTime.Now;
+						TimeSpan maxWaitTime = TimeSpan.FromSeconds(10);
+
+						while(GetNumMsgsReceived(consumerTrackers) < this.totalMsgCountToReceive)
+						{
+							if((DateTime.Now - startWaitTime) > maxWaitTime)
+							{
+								Assert.Fail("Timeout waiting for message receive.");
+							}
+
+							Thread.Sleep(5);
+						}
+
+						// Sleep for an extra 2 seconds to see if any extra messages get delivered
+						Thread.Sleep(2 * 1000);
+						Assert.AreEqual(this.totalMsgCountToReceive, GetNumMsgsReceived(consumerTrackers), "Received too many messages.");
+					}
+					finally
+					{
+
+						// Clean up the producers
+						if(null != producers)
+						{
+							foreach(IMessageProducer producer in producers)
+							{
+								producer.Dispose();
+							}
+						}
+
+						// Clean up the consumers
+						if(null != consumerTrackers)
+						{
+							foreach(ConsumerTracker tracker in consumerTrackers)
+							{
+								tracker.consumer.Dispose();
+							}
+						}
+					}
 				}
-
-				Thread.Sleep(5);
 			}
 		}
 
-		/// <summary>
-		/// Receive messages sent to consumer.
-		/// </summary>
-		/// <param name="message"></param>
-		private void OnMessage(IMessage message)
+		private int GetNumMsgsReceived(ConsumerTracker[] consumerTrackers)
 		{
-			Assert.IsInstanceOf<ITextMessage>(message, "Wrong message type received.");
-			ITextMessage textMsg = (ITextMessage) message;
-			Assert.AreEqual(textMsg.Text, "Zero Message.");
-			receivedTestMessage = true;
+			int numMsgs = 0;
+
+			foreach(ConsumerTracker tracker in consumerTrackers)
+			{
+				numMsgs += tracker.msgCount;
+			}
+
+			return numMsgs;
 		}
 	}
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs?rev=1575473&r1=1575472&r2=1575473&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs Sat Mar  8 02:49:55 2014
@@ -16,114 +16,156 @@
  */
 
 using System;
-using System.IO;
 using System.Threading;
 using NUnit.Framework;
 
 namespace Apache.NMS.ZMQ
 {
-	/// <summary>
-	/// Use to test and verify ZMQ behavior
-	/// </summary>
 	[TestFixture]
-	public class ZMQTest
+	public class ZMQTest : BaseTest
 	{
 		private bool receivedTestMessage = true;
 
-		[SetUp]
-		public void SetUp()
+		[Test]
+		public void TestConnection()
 		{
-			// Setup before each test
+			IConnectionFactory factory = NMSConnectionFactory.CreateConnectionFactory(new Uri("zmq:tcp://localhost:5556"));
+			Assert.IsNotNull(factory, "Error creating connection factory.");
+			using(IConnection connection = factory.CreateConnection())
+			{
+				Assert.IsNotNull(connection, "Problem creating connection class. Usually problem with libzmq and clrzmq ");
+				Assert.IsInstanceOf<Connection>(connection, "Wrong connection type.");
+			}
 		}
 
-		[TearDown]
-		public void TearDown()
+		[Test]
+		public void TestSession()
 		{
-			// Clean up after each test
+			IConnectionFactory factory = NMSConnectionFactory.CreateConnectionFactory(new Uri("zmq:tcp://localhost:5556"));
+			Assert.IsNotNull(factory, "Error creating connection factory.");
+			using(IConnection connection = factory.CreateConnection())
+			{
+				Assert.IsNotNull(connection, "Problem creating connection class. Usually problem with libzmq and clrzmq ");
+				using(ISession session = connection.CreateSession())
+				{
+					Assert.IsNotNull(session, "Error creating session.");
+					Assert.IsInstanceOf<Session>(session, "Wrong session type.");
+				}
+			}
+		}
+
+		[Test, Sequential]
+		public void TestDestinations(
+			[Values("queue://ZMQTestQueue", "topic://ZMQTestTopic", "temp-queue://ZMQTempQueue", "temp-topic://ZMQTempTopic")]
+			string destination,
+			[Values(typeof(Queue), typeof(Topic), typeof(TemporaryQueue), typeof(TemporaryTopic))]
+			Type destinationType)
+		{
+			IConnectionFactory factory = NMSConnectionFactory.CreateConnectionFactory(new Uri("zmq:tcp://localhost:5556"));
+			Assert.IsNotNull(factory, "Error creating connection factory.");
+			using(IConnection connection = factory.CreateConnection())
+			{
+				Assert.IsNotNull(connection, "Problem creating connection class. Usually problem with libzmq and clrzmq ");
+				using(ISession session = connection.CreateSession())
+				{
+					Assert.IsNotNull(session, "Error creating session.");
+					IDestination testDestination = session.GetDestination(destination);
+					Assert.IsNotNull(testDestination, "Error creating test destination: {0}", destination);
+					Assert.IsInstanceOf(destinationType, testDestination, "Wrong destintation type.");
+				}
+			}
 		}
 
 		[Test]
-		public void TestReceive()
+		public void TestProducers(
+			[Values("queue://ZMQTestQueue", "topic://ZMQTestTopic", "temp-queue://ZMQTempQueue", "temp-topic://ZMQTempTopic")]
+			string destination)
 		{
-			////////////////////////////
-			// Dependencies check
-			////////////////////////////
-			string libFolder = System.Environment.CurrentDirectory;
-			string libFileName;
-
-			libFileName = Path.Combine(libFolder, "clrzmq.dll");
-			Assert.IsTrue(File.Exists(libFileName), "Missing zmq wrapper file: {0}", libFileName);
-			libFileName = Path.Combine(libFolder, "libzmq.dll");
-			Assert.IsTrue(File.Exists(libFileName), "Missing zmq library file: {0}", libFileName);
-			libFileName = Path.Combine(libFolder, "libzmq64.dll");
-			Assert.IsTrue(File.Exists(libFileName), "Missing 64-bit zmq library file: {0}", libFileName);
-			libFileName = Path.Combine(libFolder, "Apache.NMS.dll");
-			Assert.IsTrue(File.Exists(libFileName), "Missing Apache.NMS library file: {0}", libFileName);
-			libFileName = Path.Combine(libFolder, "Apache.NMS.ZMQ.dll");
-			Assert.IsTrue(File.Exists(libFileName), "Missing Apache.NMS.ZMQ library file: {0}", libFileName);
-
-			////////////////////////////
-			// Factory check
-			////////////////////////////
-			IConnectionFactory factory = new ConnectionFactory("tcp://localhost:5556", "");
-			Assert.IsNotNull(factory, "Error creating connection factory.");
-
-			////////////////////////////
-			// Connection check
-			////////////////////////////
-			IConnection connection = null;
-			try
-			{
-				connection = factory.CreateConnection();
-				Assert.IsNotNull(connection, "problem creating connection class, usually problem with libzmq and clrzmq ");
-			}
-			catch(System.Exception ex1)
-			{
-				Assert.Fail("Problem creating connection, make sure dependencies are present. Error: {0}", ex1.Message);
-			}
-
-			////////////////////////////
-			// Session check
-			////////////////////////////
-			ISession session = connection.CreateSession();
-			// Is session good?
-			Assert.IsNotNull(session, "Error creating Session.");
-
-			////////////////////////////
-			// Consumer check
-			////////////////////////////
-			IQueue testQueue = session.GetQueue("ZMQTestQueue");
-			Assert.IsNotNull(testQueue, "Error creating test queue.");
-			IMessageConsumer consumer = session.CreateConsumer(testQueue);
-			Assert.IsNotNull(consumer, "Error creating consumer.");
-
-			consumer.Listener += OnMessage;
-
-			////////////////////////////
-			// Producer check
-			////////////////////////////
-			IMessageProducer producer = session.CreateProducer(testQueue);
-			Assert.IsNotNull(consumer, "Error creating producer.");
-
-			ITextMessage testMsg = producer.CreateTextMessage("Zero Message.");
-			Assert.IsNotNull(testMsg, "Error creating test message.");
-
-			producer.Send(testMsg);
-
-			////////////////////////////
-			// Listener check
-			////////////////////////////
-			DateTime startWaitTime = DateTime.Now;
-			TimeSpan maxWaitTime = TimeSpan.FromSeconds(10);
+			IConnectionFactory factory = NMSConnectionFactory.CreateConnectionFactory(new Uri("zmq:tcp://localhost:5556"));
+			Assert.IsNotNull(factory, "Error creating connection factory.");
+			using(IConnection connection = factory.CreateConnection())
+			{
+				Assert.IsNotNull(connection, "Problem creating connection class. Usually problem with libzmq and clrzmq ");
+				using(ISession session = connection.CreateSession())
+				{
+					Assert.IsNotNull(session, "Error creating session.");
+					IDestination testDestination = session.GetDestination(destination);
+					Assert.IsNotNull(testDestination, "Error creating test destination: {0}", destination);
+					using(IMessageProducer producer = session.CreateProducer(testDestination))
+					{
+						Assert.IsNotNull(producer, "Error creating producer on {0}", destination);
+						Assert.IsInstanceOf<MessageProducer>(producer, "Wrong producer type.");
+					}
+				}
+			}
+		}
 
-			while(!receivedTestMessage)
+		[Test]
+		public void TestConsumers(
+			[Values("queue://ZMQTestQueue:", "topic://ZMQTestTopic", "temp-queue://ZMQTempQueue", "temp-topic://ZMQTempTopic")]
+			string destination)
+		{
+			IConnectionFactory factory = NMSConnectionFactory.CreateConnectionFactory(new Uri("zmq:tcp://localhost:5556"));
+			Assert.IsNotNull(factory, "Error creating connection factory.");
+			using(IConnection connection = factory.CreateConnection())
 			{
-				if((DateTime.Now - startWaitTime) > maxWaitTime)
+				Assert.IsNotNull(connection, "Problem creating connection class. Usually problem with libzmq and clrzmq ");
+				using(ISession session = connection.CreateSession())
 				{
-					Assert.Fail("Timeout waiting for message receive.");
+					Assert.IsNotNull(session, "Error creating session.");
+					IDestination testDestination = session.GetDestination(destination);
+					Assert.IsNotNull(testDestination, "Error creating test destination: {0}", destination);
+					using(IMessageConsumer consumer = session.CreateConsumer(testDestination))
+					{
+						Assert.IsNotNull(consumer, "Error creating consumer on {0}", destination);
+						Assert.IsInstanceOf<MessageConsumer>(consumer, "Wrong consumer type.");
+					}
 				}
+			}
+		}
 
-				Thread.Sleep(5);
+		[Test]
+		public void TestSendReceive(
+			[Values("queue://ZMQTestQueue", "topic://ZMQTestTopic", "temp-queue://ZMQTempQueue", "temp-topic://ZMQTempTopic")]
+			string destination)
+		{
+			IConnectionFactory factory = NMSConnectionFactory.CreateConnectionFactory(new Uri("zmq:tcp://localhost:5556"));
+			Assert.IsNotNull(factory, "Error creating connection factory.");
+			using(IConnection connection = factory.CreateConnection())
+			{
+				Assert.IsNotNull(connection, "Problem creating connection class. Usually problem with libzmq and clrzmq ");
+				using(ISession session = connection.CreateSession())
+				{
+					Assert.IsNotNull(session, "Error creating Session.");
+					IDestination testDestination = session.GetDestination(destination);
+					Assert.IsNotNull(testDestination, "Error creating test destination: {0}", destination);
+					using(IMessageConsumer consumer = session.CreateConsumer(testDestination))
+					{
+						Assert.IsNotNull(consumer, "Error creating consumer on {0}", destination);
+						consumer.Listener += OnMessage;
+						using(IMessageProducer producer = session.CreateProducer(testDestination))
+						{
+							Assert.IsNotNull(consumer, "Error creating producer on {0}", destination);
+							ITextMessage testMsg = producer.CreateTextMessage("Zero Message.");
+							Assert.IsNotNull(testMsg, "Error creating test message.");
+							producer.Send(testMsg);
+						}
+
+						// Wait for the message
+						DateTime startWaitTime = DateTime.Now;
+						TimeSpan maxWaitTime = TimeSpan.FromSeconds(10);
+
+						while(!receivedTestMessage)
+						{
+							if((DateTime.Now - startWaitTime) > maxWaitTime)
+							{
+								Assert.Fail("Timeout waiting for message receive.");
+							}
+
+							Thread.Sleep(5);
+						}
+					}
+				}
 			}
 		}
 
@@ -133,13 +175,10 @@ namespace Apache.NMS.ZMQ
 		/// <param name="message"></param>
 		private void OnMessage(IMessage message)
 		{
-			Assert.IsInstanceOf<ITextMessage>(message, "Wrong message type received.");
+			Assert.IsInstanceOf<TextMessage>(message, "Wrong message type received.");
 			ITextMessage textMsg = (ITextMessage) message;
 			Assert.AreEqual(textMsg.Text, "Zero Message.");
 			receivedTestMessage = true;
 		}
 	}
 }
-
-
-

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/vs2010-zmq-net-4.0-test.csproj
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/vs2010-zmq-net-4.0-test.csproj?rev=1575473&r1=1575472&r2=1575473&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/vs2010-zmq-net-4.0-test.csproj (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/vs2010-zmq-net-4.0-test.csproj Sat Mar  8 02:49:55 2014
@@ -48,6 +48,9 @@
   </ItemGroup>
   <ItemGroup>
     <Compile Include="src\test\csharp\CommonAssemblyInfo.cs" />
+    <Compile Include="src\test\csharp\BaseTest.cs" />
+    <Compile Include="src\test\csharp\FactoryTests.cs" />
+    <Compile Include="src\test\csharp\MultiProducersMultiConsumers.cs" />
     <Compile Include="src\test\csharp\ZMQTest.cs" />
   </ItemGroup>
   <ItemGroup>

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/vs2010-zmq-net-4.0.csproj
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/vs2010-zmq-net-4.0.csproj?rev=1575473&r1=1575472&r2=1575473&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/vs2010-zmq-net-4.0.csproj (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/vs2010-zmq-net-4.0.csproj Sat Mar  8 02:49:55 2014
@@ -23,6 +23,8 @@
     <RegisterForComInterop>false</RegisterForComInterop>
     <PlatformTarget>AnyCPU</PlatformTarget>
     <TreatWarningsAsErrors>false</TreatWarningsAsErrors>
+    <NoWarn>
+    </NoWarn>
   </PropertyGroup>
   <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
     <DebugSymbols>true</DebugSymbols>
@@ -31,6 +33,8 @@
     <DefineConstants>TRACE;NET</DefineConstants>
     <AllowUnsafeBlocks>false</AllowUnsafeBlocks>
     <DebugType>full</DebugType>
+    <NoWarn>
+    </NoWarn>
   </PropertyGroup>
   <ItemGroup>
     <Reference Include="Apache.NMS, Version=1.5.0.2363, Culture=neutral, PublicKeyToken=82756feee3957618, processorArchitecture=MSIL">
@@ -69,6 +73,7 @@
       <SubType>Code</SubType>
     </Compile>
     <Compile Include="src\main\csharp\TextMessage.cs" />
+    <Compile Include="src\main\csharp\Utils.cs" />
   </ItemGroup>
   <ItemGroup>
     <Content Include="lib\clrzmq\net-4.0\libzmq.dll">



Mime
View raw message