Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id AF54D101D2 for ; Wed, 19 Mar 2014 19:10:43 +0000 (UTC) Received: (qmail 2723 invoked by uid 500); 19 Mar 2014 19:10:29 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 2629 invoked by uid 500); 19 Mar 2014 19:10:20 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 2071 invoked by uid 99); 19 Mar 2014 19:10:01 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Mar 2014 19:10:01 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Mar 2014 19:09:57 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id E0158238890D; Wed, 19 Mar 2014 19:09:34 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1579357 - in /activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src: main/csharp/BaseMessage.cs main/csharp/Connection.cs main/csharp/Destination.cs main/csharp/MessageConsumer.cs main/csharp/MessageProducer.cs test/csharp/ZMQTest.cs Date: Wed, 19 Mar 2014 19:09:34 -0000 To: commits@activemq.apache.org From: jgomes@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140319190934.E0158238890D@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: jgomes Date: Wed Mar 19 19:09:34 2014 New Revision: 1579357 URL: http://svn.apache.org/r1579357 Log: Implement disposable pattern for Connection. Only dispose the producer endpoint after its final release. Add overloaded send/receive API for destinations. Initialize the sockets on the correct message handler thread. Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/BaseMessage.cs activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Connection.cs activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/BaseMessage.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/BaseMessage.cs?rev=1579357&r1=1579356&r2=1579357&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/BaseMessage.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/BaseMessage.cs Wed Mar 19 19:09:34 2014 @@ -29,7 +29,7 @@ namespace Apache.NMS.ZMQ private string correlationId; private TimeSpan timeToLive; private string messageId; - private MsgDeliveryMode deliveryMode; + private MsgDeliveryMode deliveryMode = MsgDeliveryMode.NonPersistent; private MsgPriority priority; private Destination replyTo; private byte[] content; Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Connection.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Connection.cs?rev=1579357&r1=1579356&r2=1579357&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Connection.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Connection.cs Wed Mar 19 19:09:34 2014 @@ -19,6 +19,7 @@ using System; using ZeroMQ; using System.Collections.Generic; using System.Text; +using System.Collections; namespace Apache.NMS.ZMQ { @@ -46,24 +47,87 @@ namespace Apache.NMS.ZMQ /// /// ZMQ context /// + private static object contextLock = new object(); + private static int instanceCount = 0; private static ZmqContext _context; - private static Dictionary producerCache; - private static object producerCacheLock; + private static Dictionary producerCache = new Dictionary(); + private static object producerCacheLock = new object(); + private TimeSpan zeroTimeout = new TimeSpan(0); - static Connection() + private bool disposed = false; + + private static void InitContext() + { + lock(contextLock) + { + if(0 == instanceCount++) + { + Connection._context = ZmqContext.Create(); + } + } + } + + private static void DestroyContext() { - Connection._context = ZmqContext.Create(); - Connection.producerCache = new Dictionary(); - Connection.producerCacheLock = new object(); + lock(contextLock) + { + if(0 == --instanceCount) + { + Connection._context.Dispose(); + } + } } public Connection(Uri connectionUri) { + InitContext(); this.brokerUri = connectionUri; this.producerContextBinding = string.Format("{0}://*:{1}", this.brokerUri.Scheme, this.brokerUri.Port); this.consumerContextBinding = string.Format("{0}://{1}:{2}", brokerUri.Scheme, brokerUri.Host, this.brokerUri.Port); } + ~Connection() + { + Dispose(false); + } + + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + private void Dispose(bool disposing) + { + if(disposed) + { + return; + } + + if(disposing) + { + try + { + OnDispose(); + } + catch(Exception ex) + { + Tracer.ErrorFormat("Exception disposing Connection {0}: {1}", this.brokerUri.AbsoluteUri, ex.Message); + } + } + + disposed = true; + } + + /// + /// Child classes can override this method to perform clean-up logic. + /// + protected virtual void OnDispose() + { + Close(); + DestroyContext(); + } + /// /// Starts message delivery for this connection. /// @@ -147,12 +211,13 @@ namespace Apache.NMS.ZMQ { producerCache.Remove(contextBinding); producerRef.producer.Unbind(contextBinding); + producerRef.producer.Dispose(); } } } } - internal ZmqSocket GetConsumer(Encoding encoding, string destinationName) + internal ZmqSocket GetConsumer() { ZmqSocket endpoint = this.Context.CreateSocket(SocketType.SUB); @@ -160,8 +225,6 @@ namespace Apache.NMS.ZMQ { throw new ResourceAllocationException(); } - endpoint.Subscribe(encoding.GetBytes(destinationName)); - endpoint.Connect(GetConsumerBindingPath()); return endpoint; } @@ -169,6 +232,7 @@ namespace Apache.NMS.ZMQ internal void ReleaseConsumer(ZmqSocket endpoint) { endpoint.Disconnect(GetConsumerBindingPath()); + endpoint.Dispose(); } internal string GetProducerContextBinding() @@ -176,16 +240,11 @@ namespace Apache.NMS.ZMQ return this.producerContextBinding; } - private string GetConsumerBindingPath() + internal string GetConsumerBindingPath() { return this.consumerContextBinding; } - public void Dispose() - { - Close(); - } - public void Close() { Stop(); Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs?rev=1579357&r1=1579356&r2=1579357&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs Wed Mar 19 19:09:34 2014 @@ -18,6 +18,7 @@ using System; using System.Text; using ZeroMQ; +using System.Diagnostics; namespace Apache.NMS.ZMQ { @@ -26,6 +27,8 @@ namespace Apache.NMS.ZMQ /// public abstract class Destination : IDestination { + public static Encoding encoding = Encoding.UTF8; + protected Session session; /// /// Socket object @@ -92,14 +95,12 @@ namespace Apache.NMS.ZMQ this.session.Connection.ReleaseProducer(this.producerEndpoint); } - this.producerEndpoint.Dispose(); this.producerEndpoint = null; } if(null != this.consumerEndpoint) { this.session.Connection.ReleaseConsumer(this.consumerEndpoint); - this.consumerEndpoint.Dispose(); this.consumerEndpoint = null; } } @@ -178,35 +179,82 @@ namespace Apache.NMS.ZMQ get; } - internal int Send(byte[] buffer, TimeSpan timeout) + internal void InitSender() { if(null == this.producerEndpoint) { this.producerEndpoint = this.session.Connection.GetProducer(); } - - return this.producerEndpoint.Send(buffer, buffer.Length, SocketFlags.None, timeout); } - internal string Receive(Encoding encoding, TimeSpan timeout) + internal void InitReceiver() { if(null == this.consumerEndpoint) { - this.consumerEndpoint = this.session.Connection.GetConsumer(encoding, this.destinationName); + Connection connection = this.session.Connection; + + this.consumerEndpoint = connection.GetConsumer(); + // Must subscribe first before connecting to the endpoint binding + this.consumerEndpoint.Subscribe(Destination.encoding.GetBytes(this.destinationName)); + this.consumerEndpoint.Connect(connection.GetConsumerBindingPath()); } + } + + internal void Subscribe(string prefixName) + { + InitReceiver(); + this.consumerEndpoint.Subscribe(Destination.encoding.GetBytes(prefixName)); + } + + internal void Unsubscribe(string prefixName) + { + if(null != this.consumerEndpoint) + { + this.consumerEndpoint.Unsubscribe(Destination.encoding.GetBytes(prefixName)); + } + } - return consumerEndpoint.Receive(encoding, timeout); + internal SendStatus Send(string msg) + { + Debug.Assert(null != this.producerEndpoint, "Call InitSender() before calling Send()."); + return this.producerEndpoint.Send(msg, Destination.encoding); + } + + internal SendStatus Send(byte[] buffer) + { + Debug.Assert(null != this.producerEndpoint, "Call InitSender() before calling Send()."); + return this.producerEndpoint.Send(buffer); + } + + internal string ReceiveString(TimeSpan timeout) + { + this.InitReceiver(); + return this.consumerEndpoint.Receive(Destination.encoding, timeout); + } + + internal byte[] ReceiveBytes(TimeSpan timeout, out int size) + { + this.InitReceiver(); + return this.consumerEndpoint.Receive(null, timeout, out size); + } + + internal byte[] ReceiveBytes(SocketFlags flags, out int size) + { + this.InitReceiver(); + return this.consumerEndpoint.Receive(null, flags, out size); } internal Frame ReceiveFrame() { // TODO: Implement + this.InitReceiver(); return null; } internal ZmqMessage ReceiveMessage() { // TODO: Implement + this.InitReceiver(); return null; } } Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=1579357&r1=1579356&r2=1579357&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs Wed Mar 19 19:09:34 2014 @@ -15,14 +15,10 @@ * limitations under the License. */ -#define PUBSUB - using System; +using System.Diagnostics; using System.Text; using System.Threading; -using Apache.NMS.Util; -using ZeroMQ; -using System.Diagnostics; namespace Apache.NMS.ZMQ { @@ -31,7 +27,7 @@ namespace Apache.NMS.ZMQ /// public class MessageConsumer : IMessageConsumer { - protected TimeSpan zeroTimeout = new TimeSpan(0); + protected static readonly TimeSpan zeroTimeout = new TimeSpan(0); private readonly Session session; private readonly AcknowledgementMode acknowledgementMode; @@ -41,6 +37,8 @@ namespace Apache.NMS.ZMQ private Thread asyncDeliveryThread = null; private object asyncDeliveryLock = new object(); private bool asyncDelivery = false; + private bool asyncInit = false; + private byte[] rawDestinationName; private ConsumerTransformerDelegate consumerTransformer; public ConsumerTransformerDelegate ConsumerTransformer @@ -60,29 +58,40 @@ namespace Apache.NMS.ZMQ this.session = sess; this.destination = (Destination) dest; + this.rawDestinationName = Destination.encoding.GetBytes(this.destination.Name); this.acknowledgementMode = ackMode; } + private object listenerLock = new object(); public event MessageListener Listener { add { - this.listener += value; - this.listenerCount++; - StartAsyncDelivery(); + lock(listenerLock) + { + this.listener += value; + if(0 == this.listenerCount) + { + StartAsyncDelivery(); + } + + this.listenerCount++; + } } remove { - if(this.listenerCount > 0) + lock(listenerLock) { this.listener -= value; - this.listenerCount--; - } - - if(0 == listenerCount) - { - StopAsyncDelivery(); + if(this.listenerCount > 0) + { + this.listenerCount--; + if(0 == this.listenerCount) + { + StopAsyncDelivery(); + } + } } } } @@ -106,15 +115,17 @@ namespace Apache.NMS.ZMQ /// public IMessage Receive(TimeSpan timeout) { - // TODO: Support decoding of all message types + all meta data (e.g., headers and properties) - string msgContent = this.destination.Receive(Encoding.UTF8, timeout); + int size; + byte[] receivedMsg = this.destination.ReceiveBytes(timeout, out size); - if(null != msgContent) + if(size > 0) { // Strip off the subscribed destination name. - string destinationName = this.destination.Name; - string messageText = msgContent.Substring(destinationName.Length, msgContent.Length - destinationName.Length); - return ToNmsMessage(messageText); + // TODO: Support decoding of all message types + all meta data (e.g., headers and properties) + int msgStart = this.rawDestinationName.Length; + int msgLength = receivedMsg.Length - msgStart; + string msgContent = Encoding.UTF8.GetString(receivedMsg, msgStart, msgLength); + return ToNmsMessage(msgContent); } return null; @@ -150,7 +161,7 @@ namespace Apache.NMS.ZMQ protected virtual void StopAsyncDelivery() { - lock(asyncDeliveryLock) + lock(this.asyncDeliveryLock) { this.asyncDelivery = false; if(null != this.asyncDeliveryThread) @@ -174,33 +185,49 @@ namespace Apache.NMS.ZMQ Debug.Assert(null == this.asyncDeliveryThread); lock(this.asyncDeliveryLock) { + this.asyncInit = false; this.asyncDelivery = true; - this.asyncDeliveryThread = new Thread(new ThreadStart(DispatchLoop)); + this.asyncDeliveryThread = new Thread(new ThreadStart(MsgDispatchLoop)); this.asyncDeliveryThread.Name = string.Format("MsgConsumerAsync: {0}", this.destination.Name); this.asyncDeliveryThread.IsBackground = true; this.asyncDeliveryThread.Start(); + while(!asyncInit) + { + Thread.Sleep(1); + } } } - protected virtual void DispatchLoop() + protected virtual void MsgDispatchLoop() { Tracer.InfoFormat("Starting dispatcher thread consumer: {0}", this.asyncDeliveryThread.Name); - TimeSpan receiveWait = TimeSpan.FromSeconds(3); + TimeSpan receiveWait = TimeSpan.FromSeconds(2); + + // Signal that this thread has started. + asyncInit = true; while(asyncDelivery) { try { IMessage message = Receive(receiveWait); - if(asyncDelivery && message != null) + + if(asyncDelivery) { - try + if(null != message) { - listener(message); + try + { + listener(message); + } + catch(Exception ex) + { + HandleAsyncException(ex); + } } - catch(Exception ex) + else { - HandleAsyncException(ex); + Thread.Sleep(0); } } } Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs?rev=1579357&r1=1579356&r2=1579357&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs Wed Mar 19 19:09:34 2014 @@ -30,9 +30,9 @@ namespace Apache.NMS.ZMQ public class MessageProducer : IMessageProducer { private readonly Session session; - private IDestination destination; + private Destination destination; - private MsgDeliveryMode deliveryMode; + private MsgDeliveryMode deliveryMode = MsgDeliveryMode.NonPersistent; private TimeSpan timeToLive; private MsgPriority priority; private bool disableMessageID; @@ -53,17 +53,18 @@ namespace Apache.NMS.ZMQ } this.session = sess; - this.destination = dest; + this.destination = (Destination) dest; + this.destination.InitSender(); } public void Send(IMessage message) { - Send(this.Destination, message); + Send(this.destination, message); } public void Send(IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive) { - Send(this.Destination, message, deliveryMode, priority, timeToLive); + Send(this.destination, message, deliveryMode, priority, timeToLive); } public void Send(IDestination dest, IMessage message) @@ -94,7 +95,7 @@ namespace Apache.NMS.ZMQ Destination theDest = (Destination) dest; string msg = theDest.Name + ((ITextMessage) message).Text; - theDest.Send(Encoding.UTF8.GetBytes(msg), this.session.Connection.RequestTimeout); + theDest.Send(msg); } public void Dispose() @@ -168,12 +169,6 @@ namespace Apache.NMS.ZMQ set { } } - public IDestination Destination - { - get { return this.destination; } - set { this.destination = value; } - } - public MsgPriority Priority { get { return this.priority; } Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs?rev=1579357&r1=1579356&r2=1579357&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs Wed Mar 19 19:09:34 2014 @@ -24,7 +24,7 @@ namespace Apache.NMS.ZMQ [TestFixture] public class ZMQTest : BaseTest { - private bool receivedTestMessage = false; + private int receivedMsgCount = 0; [Test] public void TestConnection() @@ -132,46 +132,62 @@ namespace Apache.NMS.ZMQ [Test] public void TestSendReceive( + // inproc, ipc, tcp, pgm, or epgm + [Values("zmq:tcp://localhost:5556", "zmq:inproc://localhost:5557")] + string connectionName, [Values("queue://ZMQTestQueue", "topic://ZMQTestTopic", "temp-queue://ZMQTempQueue", "temp-topic://ZMQTempTopic")] - string destination) + string destinationName) { - IConnectionFactory factory = NMSConnectionFactory.CreateConnectionFactory(new Uri("zmq:tcp://localhost:5556")); + IConnectionFactory factory = NMSConnectionFactory.CreateConnectionFactory(new Uri(connectionName)); Assert.IsNotNull(factory, "Error creating connection factory."); - this.receivedTestMessage = false; + this.receivedMsgCount = 0; using(IConnection connection = factory.CreateConnection()) { Assert.IsNotNull(connection, "Problem creating connection class. Usually problem with libzmq and clrzmq "); using(ISession session = connection.CreateSession()) { Assert.IsNotNull(session, "Error creating Session."); - using(IDestination testDestination = session.GetDestination(destination)) + using(IDestination testDestination = session.GetDestination(destinationName)) { - Assert.IsNotNull(testDestination, "Error creating test destination: {0}", destination); + Assert.IsNotNull(testDestination, "Error creating test destination: {0}", destinationName); using(IMessageConsumer consumer = session.CreateConsumer(testDestination)) { - Assert.IsNotNull(consumer, "Error creating consumer on {0}", destination); - consumer.Listener += OnMessage; - using(IMessageProducer producer = session.CreateProducer(testDestination)) + Assert.IsNotNull(consumer, "Error creating consumer on {0}", destinationName); + int sendMsgCount = 0; + try { - Assert.IsNotNull(consumer, "Error creating producer on {0}", destination); - ITextMessage testMsg = producer.CreateTextMessage("Zero Message."); - Assert.IsNotNull(testMsg, "Error creating test message."); - producer.Send(testMsg); - } - - // Wait for the message - DateTime startWaitTime = DateTime.Now; - TimeSpan maxWaitTime = TimeSpan.FromSeconds(5); - - while(!receivedTestMessage) - { - if((DateTime.Now - startWaitTime) > maxWaitTime) + consumer.Listener += OnMessage; + using(IMessageProducer producer = session.CreateProducer(testDestination)) { - Assert.Fail("Timeout waiting for message receive."); - } + Assert.IsNotNull(consumer, "Error creating producer on {0}", destinationName); + ITextMessage testMsg = producer.CreateTextMessage("Zero Message."); + Assert.IsNotNull(testMsg, "Error creating test message."); + + // Wait for the message + DateTime startWaitTime = DateTime.Now; + TimeSpan maxWaitTime = TimeSpan.FromSeconds(5); + + // Continually send the message to compensate for the + // slow joiner problem inherent to spinning up the + // internal dispatching threads in ZeroMQ. + while(this.receivedMsgCount < 1) + { + ++sendMsgCount; + producer.Send(testMsg); + if((DateTime.Now - startWaitTime) > maxWaitTime) + { + Assert.Fail("Timeout waiting for message receive."); + } - Thread.Sleep(5); + Thread.Sleep(1); + } + } + } + finally + { + consumer.Listener -= OnMessage; + Console.WriteLine("Sent {0} msgs.\nReceived {1} msgs", sendMsgCount, this.receivedMsgCount); } } } @@ -188,7 +204,7 @@ namespace Apache.NMS.ZMQ Assert.IsInstanceOf(message, "Wrong message type received."); ITextMessage textMsg = (ITextMessage) message; Assert.AreEqual(textMsg.Text, "Zero Message."); - receivedTestMessage = true; + this.receivedMsgCount++; } } }