activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jgo...@apache.org
Subject svn commit: r1145399 - in /activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk: ./ src/main/csharp/
Date Mon, 11 Jul 2011 23:51:27 GMT
Author: jgomes
Date: Mon Jul 11 23:51:26 2011
New Revision: 1145399

URL: http://svn.apache.org/viewvc?rev=1145399&view=rev
Log:
Fixed the loading of 32-bit vs. 64-bit implementations.
Refactoring the publisher/subscriber objects.
Fixes [AMQNET-333]. (See https://issues.apache.org/jira/browse/AMQNET-333)

Removed:
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/ZmqSubscriber.cs
Modified:
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/CommonConnectionFactory.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Connection.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Session.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/vs2010-zmq-net-4.0x64.csproj
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/vs2010-zmq-net-4.0x86.csproj

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/CommonConnectionFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/CommonConnectionFactory.cs?rev=1145399&r1=1145398&r2=1145399&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/CommonConnectionFactory.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/CommonConnectionFactory.cs
Mon Jul 11 23:51:26 2011
@@ -90,7 +90,7 @@ namespace Apache.NMS.ZMQ
 
 						try
 						{
-							factoryAssembly = Assembly.Load(fullFileName);
+							factoryAssembly = Assembly.LoadFile(fullFileName);
 							if(null != factoryAssembly)
 							{
 								Tracer.DebugFormat("Succesfully loaded provider: {0}", fullFileName);
@@ -126,18 +126,21 @@ namespace Apache.NMS.ZMQ
 			ArrayList pathList = new ArrayList();
 
 			// Check the current folder first.
-			pathList.Add("");
+			pathList.Add(Environment.CurrentDirectory);
 
-			// Check the folder the assembly is located in.
 			AppDomain currentDomain = AppDomain.CurrentDomain;
 
+			// Check the folder the assembly is located in.
 			pathList.Add(Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location));
-			if(null != currentDomain.BaseDirectory)
+
+			// Check the domain's base directory
+			if(!string.IsNullOrEmpty(currentDomain.BaseDirectory))
 			{
 				pathList.Add(currentDomain.BaseDirectory);
 			}
 
-			if(null != currentDomain.RelativeSearchPath)
+			// Search the domain's relative paths.
+			if(!string.IsNullOrEmpty(currentDomain.RelativeSearchPath))
 			{
 				pathList.Add(currentDomain.RelativeSearchPath);
 			}

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Connection.cs?rev=1145399&r1=1145398&r2=1145399&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Connection.cs Mon Jul 11
23:51:26 2011
@@ -16,7 +16,7 @@
  */
 
 using System;
-using CLRZMQ = ZMQ;
+using ZContext = ZMQ.Context;
 
 namespace Apache.NMS.ZMQ
 {
@@ -38,7 +38,7 @@ namespace Apache.NMS.ZMQ
 		/// <summary>
 		/// ZMQ context 
 		/// </summary>
-		static private CLRZMQ.Context _context = new CLRZMQ.Context(1);
+		static private ZContext _context = new ZContext(1);
 
 		/// <summary>
 		/// Starts message delivery for this connection.
@@ -155,7 +155,7 @@ namespace Apache.NMS.ZMQ
 		/// <summary>
 		/// Gets ZMQ connection context
 		/// </summary>
-		static internal CLRZMQ.Context Context
+		static internal ZContext Context
 		{
 			get
 			{

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=1145399&r1=1145398&r2=1145399&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs Mon Jul
11 23:51:26 2011
@@ -16,9 +16,12 @@
  */
 
 using System;
+using System.Text;
 using System.Threading;
 using Apache.NMS.Util;
-using CLRZMQ = ZMQ;
+using ZSocket = ZMQ.Socket;
+using ZSocketType = ZMQ.SocketType;
+using ZSendRecvOpt = ZMQ.SendRecvOpt;
 
 namespace Apache.NMS.ZMQ
 {
@@ -31,7 +34,14 @@ namespace Apache.NMS.ZMQ
 
 		private readonly Session session;
 		private readonly AcknowledgementMode acknowledgementMode;
-		private ZmqSubscriber messageSubscriber;
+		/// <summary>
+		/// Socket object
+		/// </summary>
+		private ZSocket messageSubscriber = null;
+		/// <summary>
+		/// Context binding string
+		/// </summary>
+		private string contextBinding;
 		private event MessageListener listener;
 		private int listenerCount = 0;
 		private Thread asyncDeliveryThread = null;
@@ -45,11 +55,31 @@ namespace Apache.NMS.ZMQ
 			set { this.consumerTransformer = value; }
 		}
 
-		public MessageConsumer(Session session, AcknowledgementMode acknowledgementMode, ZmqSubscriber
messageSubscriber)
+		public MessageConsumer(Session session, AcknowledgementMode acknowledgementMode, IDestination
destination, string selector)
 		{
+			if(null == Connection.Context)
+			{
+				throw new NMSConnectionException();
+			}
+
 			this.session = session;
 			this.acknowledgementMode = acknowledgementMode;
-			this.messageSubscriber = messageSubscriber;
+			this.messageSubscriber = Connection.Context.Socket(ZSocketType.SUB);
+			if(null == this.messageSubscriber)
+			{
+				throw new ResourceAllocationException();
+			}
+
+			string clientId = session.Connection.ClientId;
+
+			this.contextBinding = session.Connection.BrokerUri.LocalPath;
+			if(!string.IsNullOrEmpty(clientId))
+			{
+				this.messageSubscriber.StringToIdentity(clientId, Encoding.Unicode);
+			}
+
+			this.messageSubscriber.Connect(contextBinding);
+			this.messageSubscriber.Subscribe(selector ?? string.Empty, Encoding.ASCII);
 		}
 
 		public event MessageListener Listener
@@ -87,7 +117,7 @@ namespace Apache.NMS.ZMQ
 			IMessage nmsMessage = null;
 			if(null != messageSubscriber)
 			{
-				string messageText = messageSubscriber.Subscriber.Recv(System.Text.Encoding.ASCII, CLRZMQ.SendRecvOpt.NOBLOCK);
+				string messageText = messageSubscriber.Recv(Encoding.ASCII, ZSendRecvOpt.NOBLOCK);
 				if(!string.IsNullOrEmpty(messageText))
 				{
 					nmsMessage = ToNmsMessage(messageText);
@@ -107,7 +137,7 @@ namespace Apache.NMS.ZMQ
 			IMessage nmsMessage = null;
 			if(null != messageSubscriber)
 			{
-				string messageText = messageSubscriber.Subscriber.Recv(System.Text.Encoding.ASCII, timeout.Milliseconds);
+				string messageText = messageSubscriber.Recv(Encoding.ASCII, timeout.Milliseconds);
 				if(!string.IsNullOrEmpty(messageText))
 				{
 					nmsMessage = ToNmsMessage(messageText);
@@ -173,7 +203,7 @@ namespace Apache.NMS.ZMQ
 			if(asyncDelivery.CompareAndSet(false, true))
 			{
 				asyncDeliveryThread = new Thread(new ThreadStart(DispatchLoop));
-				asyncDeliveryThread.Name = "Message Consumer Dispatch: " + messageSubscriber.Binding;
+				asyncDeliveryThread.Name = "Message Consumer Dispatch: " + contextBinding;
 				asyncDeliveryThread.IsBackground = true;
 				asyncDeliveryThread.Start();
 			}
@@ -277,7 +307,7 @@ namespace Apache.NMS.ZMQ
 		private ZmqMessage ToZmqMessage(string messageText)
 		{
 			ZmqMessage message = new ZmqMessage();
-			message.Destination = new Queue(session.Connection.BrokerUri.LocalPath);
+			message.Destination = new Queue(this.contextBinding);
 			message.ClientId = session.Connection.ClientId;
 			message.Text = messageText;
 			return message;

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs?rev=1145399&r1=1145398&r2=1145399&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs Mon Jul
11 23:51:26 2011
@@ -16,6 +16,9 @@
  */
 
 using System;
+using ZSocket = ZMQ.Socket;
+using ZSocketType = ZMQ.SocketType;
+using System.Text;
 
 namespace Apache.NMS.ZMQ
 {
@@ -24,11 +27,13 @@ namespace Apache.NMS.ZMQ
 	/// </summary>
 	public class MessageProducer : IMessageProducer
 	{
-
 		private readonly Session session;
-		private Destination destination;
+		private IDestination destination;
 
-		//private long messageCounter;
+		/// <summary>
+		/// Socket object
+		/// </summary>
+		private ZSocket messageProducer = null;
 		private MsgDeliveryMode deliveryMode;
 		private TimeSpan timeToLive;
 		private MsgPriority priority;
@@ -42,13 +47,24 @@ namespace Apache.NMS.ZMQ
 			set { this.producerTransformer = value; }
 		}
 
-		public MessageProducer(Session session, Destination destination)
+		public MessageProducer(Connection connection, Session session, IDestination destination)
 		{
+			if(null == Connection.Context)
+			{
+				throw new NMSConnectionException();
+			}
+
 			this.session = session;
 			this.destination = destination;
-			if(destination != null)
+			this.messageProducer = Connection.Context.Socket(ZSocketType.SUB);
+
+			string clientId = connection.ClientId;
+			if(!string.IsNullOrEmpty(clientId))
 			{
+				this.messageProducer.StringToIdentity(clientId, Encoding.Unicode);
 			}
+
+			this.messageProducer.Connect(connection.BrokerUri.LocalPath);
 		}
 
 		public void Send(IMessage message)
@@ -68,16 +84,21 @@ namespace Apache.NMS.ZMQ
 
 		public void Send(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode,
MsgPriority priority, TimeSpan timeToLive)
 		{
-			// TODO: Implement sending a message.
+			messageProducer.Send();
 		}
 
-		public void Close()
+		public void Dispose()
 		{
+			Close();
 		}
 
-		public void Dispose()
+		public void Close()
 		{
-			Close();
+			if(null != messageProducer)
+			{
+				messageProducer.Dispose();
+				messageProducer = null;
+			}
 		}
 
 		public IMessage CreateMessage()
@@ -144,7 +165,7 @@ namespace Apache.NMS.ZMQ
 		public IDestination Destination
 		{
 			get { return destination; }
-			set { destination = (Destination) value; }
+			set { destination = value; }
 		}
 
 		public MsgPriority Priority

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Session.cs?rev=1145399&r1=1145398&r2=1145399&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Session.cs Mon Jul 11 23:51:26
2011
@@ -57,7 +57,7 @@ namespace Apache.NMS.ZMQ
 
 		public IMessageProducer CreateProducer(IDestination destination)
 		{
-			throw new NotSupportedException("Producer is not supported/implemented");
+			return new MessageProducer(connection, this, destination);
 		}
 		#endregion
 
@@ -76,7 +76,7 @@ namespace Apache.NMS.ZMQ
 		{
 			// Subscriber client reads messages from a publisher and forwards messages 
 			// through the message consumer 
-			return new MessageConsumer(this, acknowledgementMode, new ZmqSubscriber(connection, destination,
selector));
+			return new MessageConsumer(this, acknowledgementMode, destination, selector);
 		}
 
 		public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector,
bool noLocal)

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/vs2010-zmq-net-4.0x64.csproj
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/vs2010-zmq-net-4.0x64.csproj?rev=1145399&r1=1145398&r2=1145399&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/vs2010-zmq-net-4.0x64.csproj (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/vs2010-zmq-net-4.0x64.csproj Mon Jul 11
23:51:26 2011
@@ -70,7 +70,6 @@
       <SubType>Code</SubType>
     </Compile>
     <Compile Include="src\main\csharp\TextMessage.cs" />
-    <Compile Include="src\main\csharp\ZmqSubscriber.cs" />
     <Compile Include="src\main\csharp\ZmqMessage.cs" />
   </ItemGroup>
   <ItemGroup>

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/vs2010-zmq-net-4.0x86.csproj
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/vs2010-zmq-net-4.0x86.csproj?rev=1145399&r1=1145398&r2=1145399&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/vs2010-zmq-net-4.0x86.csproj (original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/vs2010-zmq-net-4.0x86.csproj Mon Jul 11
23:51:26 2011
@@ -70,7 +70,6 @@
       <SubType>Code</SubType>
     </Compile>
     <Compile Include="src\main\csharp\TextMessage.cs" />
-    <Compile Include="src\main\csharp\ZmqSubscriber.cs" />
     <Compile Include="src\main\csharp\ZmqMessage.cs" />
   </ItemGroup>
   <ItemGroup>



Mime
View raw message