activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject [20/50] [abbrv] activemq-nms-msmq git commit: fix for: https://issues.apache.org/activemq/browse/AMQNET-271
Date Tue, 07 Mar 2017 19:39:00 GMT
fix for: https://issues.apache.org/activemq/browse/AMQNET-271


Project: http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/commit/48f3e707
Tree: http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/tree/48f3e707
Diff: http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/diff/48f3e707

Branch: refs/heads/1.4.x
Commit: 48f3e707949d41bb18b37a675575b2a823ba176f
Parents: be02596
Author: Timothy A. Bish <tabish@apache.org>
Authored: Sun Aug 29 18:44:38 2010 +0000
Committer: Timothy A. Bish <tabish@apache.org>
Committed: Sun Aug 29 18:44:38 2010 +0000

----------------------------------------------------------------------
 trunk/src/main/csharp/Connection.cs        | 402 +++++++++---------
 trunk/src/main/csharp/ConnectionFactory.cs | 212 +++++-----
 trunk/src/main/csharp/MessageConsumer.cs   | 433 ++++++++++----------
 trunk/src/main/csharp/MessageProducer.cs   | 524 ++++++++++++------------
 trunk/src/main/csharp/Session.cs           | 464 +++++++++++----------
 5 files changed, 1058 insertions(+), 977 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/48f3e707/trunk/src/main/csharp/Connection.cs
----------------------------------------------------------------------
diff --git a/trunk/src/main/csharp/Connection.cs b/trunk/src/main/csharp/Connection.cs
index 096e41f..13dda20 100644
--- a/trunk/src/main/csharp/Connection.cs
+++ b/trunk/src/main/csharp/Connection.cs
@@ -19,198 +19,212 @@ using System;
 
 namespace Apache.NMS.MSMQ
 {
-	/// <summary>
-	/// Represents a NMS connection MSMQ.  Since the underlying MSMQ APIs are actually
-	/// connectionless, NMS connection in the MSMQ case are not expensive operations.
-	/// </summary>
-	///
-	public class Connection : IConnection
-	{
-		private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
-		private IMessageConverter messageConverter = new DefaultMessageConverter();
-
-		private IRedeliveryPolicy redeliveryPolicy;
-		private ConnectionMetaData metaData = null;
-		private bool connected;
-		private bool closed;
-		private string clientId;
-
-		/// <summary>
-		/// Starts message delivery for this connection.
-		/// </summary>
-		public void Start()
-		{
-			CheckConnected();
-		}
-
-		/// <summary>
-		/// This property determines if the asynchronous message delivery of incoming
-		/// messages has been started for this connection.
-		/// </summary>
-		public bool IsStarted
-		{
-			get { return true; }
-		}
-
-		/// <summary>
-		/// Stop message delivery for this connection.
-		/// </summary>
-		public void Stop()
-		{
-			CheckConnected();
-		}
-
-		/// <summary>
-		/// Creates a new session to work on this connection
-		/// </summary>
-		public ISession CreateSession()
-		{
-			return CreateSession(acknowledgementMode);
-		}
-
-		/// <summary>
-		/// Creates a new session to work on this connection
-		/// </summary>
-		public ISession CreateSession(AcknowledgementMode mode)
-		{
-			CheckConnected();
-			return new Session(this, mode);
-		}
-
-		public void Dispose()
-		{
-			closed = true;
-		}
-
-		/// <summary>
-		/// The default timeout for network requests.
-		/// </summary>
-		public TimeSpan RequestTimeout
-		{
-			get { return NMSConstants.defaultRequestTimeout; }
-			set { }
-		}
-
-		public AcknowledgementMode AcknowledgementMode
-		{
-			get { return acknowledgementMode; }
-			set { acknowledgementMode = value; }
-		}
-
-		public IMessageConverter MessageConverter
-		{
-			get { return messageConverter; }
-			set { messageConverter = value; }
-		}
-
-		public string ClientId
-		{
-			get { return clientId; }
-			set
-			{
-				if(connected)
-				{
-					throw new NMSException("You cannot change the ClientId once the Connection is connected");
-				}
-				clientId = value;
-			}
-		}
-
-		/// <summary>
-		/// Get/or set the redelivery policy for this connection.
-		/// </summary>
-		public IRedeliveryPolicy RedeliveryPolicy
-		{
-			get { return this.redeliveryPolicy; }
-			set { this.redeliveryPolicy = value; }
-		}
-
-		/// <summary>
-		/// Gets the Meta Data for the NMS Connection instance.
-		/// </summary>
-		public IConnectionMetaData MetaData
-		{
-			get { return this.metaData ?? (this.metaData = new ConnectionMetaData()); }
-		}
-
-		/// <summary>
-		/// A delegate that can receive transport level exceptions.
-		/// </summary>
-		public event ExceptionListener ExceptionListener;
-
-		/// <summary>
-		/// An asynchronous listener that is notified when a Fault tolerant connection
-		/// has been interrupted.
-		/// </summary>
-		public event ConnectionInterruptedListener ConnectionInterruptedListener;
-
-		/// <summary>
-		/// An asynchronous listener that is notified when a Fault tolerant connection
-		/// has been resumed.
-		/// </summary>
-		public event ConnectionResumedListener ConnectionResumedListener;
-
-		protected void CheckConnected()
-		{
-			if(closed)
-			{
-				throw new NMSException("Connection Closed");
-			}
-			if(!connected)
-			{
-				connected = true;
-				// now lets send the connection and see if we get an ack/nak
-				// TODO: establish a connection
-			}
-		}
-
-		public void Close()
-		{
-			Dispose();
-		}
-
-		public void HandleException(Exception e)
-		{
-			if(ExceptionListener != null && !this.closed)
-			{
-				ExceptionListener(e);
-			}
-			else
-			{
-				Tracer.Error(e);
-			}
-		}
-
-		public void HandleTransportInterrupted()
-		{
-			Tracer.Debug("Transport has been Interrupted.");
-
-			if(this.ConnectionInterruptedListener != null && !this.closed)
-			{
-				try
-				{
-					this.ConnectionInterruptedListener();
-				}
-				catch
-				{
-				}
-			}
-		}
-
-		public void HandleTransportResumed()
-		{
-			Tracer.Debug("Transport has resumed normal operation.");
-
-			if(this.ConnectionResumedListener != null && !this.closed)
-			{
-				try
-				{
-					this.ConnectionResumedListener();
-				}
-				catch
-				{
-				}
-			}
-		}
-	}
+    /// <summary>
+    /// Represents a NMS connection MSMQ.  Since the underlying MSMQ APIs are actually
+    /// connectionless, NMS connection in the MSMQ case are not expensive operations.
+    /// </summary>
+    ///
+    public class Connection : IConnection
+    {
+        private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
+        private IMessageConverter messageConverter = new DefaultMessageConverter();
+
+        private IRedeliveryPolicy redeliveryPolicy;
+        private ConnectionMetaData metaData = null;
+        private bool connected;
+        private bool closed;
+        private string clientId;
+
+        /// <summary>
+        /// Starts message delivery for this connection.
+        /// </summary>
+        public void Start()
+        {
+            CheckConnected();
+        }
+
+        /// <summary>
+        /// This property determines if the asynchronous message delivery of incoming
+        /// messages has been started for this connection.
+        /// </summary>
+        public bool IsStarted
+        {
+            get { return true; }
+        }
+
+        /// <summary>
+        /// Stop message delivery for this connection.
+        /// </summary>
+        public void Stop()
+        {
+            CheckConnected();
+        }
+
+        /// <summary>
+        /// Creates a new session to work on this connection
+        /// </summary>
+        public ISession CreateSession()
+        {
+            return CreateSession(acknowledgementMode);
+        }
+
+        /// <summary>
+        /// Creates a new session to work on this connection
+        /// </summary>
+        public ISession CreateSession(AcknowledgementMode mode)
+        {
+            CheckConnected();
+            return new Session(this, mode);
+        }
+
+        public void Dispose()
+        {
+            closed = true;
+        }
+
+        /// <summary>
+        /// The default timeout for network requests.
+        /// </summary>
+        public TimeSpan RequestTimeout
+        {
+            get { return NMSConstants.defaultRequestTimeout; }
+            set { }
+        }
+
+        public AcknowledgementMode AcknowledgementMode
+        {
+            get { return acknowledgementMode; }
+            set { acknowledgementMode = value; }
+        }
+
+        public IMessageConverter MessageConverter
+        {
+            get { return messageConverter; }
+            set { messageConverter = value; }
+        }
+
+        public string ClientId
+        {
+            get { return clientId; }
+            set
+            {
+                if(connected)
+                {
+                    throw new NMSException("You cannot change the ClientId once the Connection is connected");
+                }
+                clientId = value;
+            }
+        }
+
+        /// <summary>
+        /// Get/or set the redelivery policy for this connection.
+        /// </summary>
+        public IRedeliveryPolicy RedeliveryPolicy
+        {
+            get { return this.redeliveryPolicy; }
+            set { this.redeliveryPolicy = value; }
+        }
+
+        private ConsumerTransformerDelegate consumerTransformer;
+        public ConsumerTransformerDelegate ConsumerTransformer
+        {
+            get { return this.consumerTransformer; }
+            set { this.consumerTransformer = value; }
+        }
+
+        private ProducerTransformerDelegate producerTransformer;
+        public ProducerTransformerDelegate ProducerTransformer
+        {
+            get { return this.producerTransformer; }
+            set { this.producerTransformer = value; }
+        }
+
+        /// <summary>
+        /// Gets the Meta Data for the NMS Connection instance.
+        /// </summary>
+        public IConnectionMetaData MetaData
+        {
+            get { return this.metaData ?? (this.metaData = new ConnectionMetaData()); }
+        }
+
+        /// <summary>
+        /// A delegate that can receive transport level exceptions.
+        /// </summary>
+        public event ExceptionListener ExceptionListener;
+
+        /// <summary>
+        /// An asynchronous listener that is notified when a Fault tolerant connection
+        /// has been interrupted.
+        /// </summary>
+        public event ConnectionInterruptedListener ConnectionInterruptedListener;
+
+        /// <summary>
+        /// An asynchronous listener that is notified when a Fault tolerant connection
+        /// has been resumed.
+        /// </summary>
+        public event ConnectionResumedListener ConnectionResumedListener;
+
+        protected void CheckConnected()
+        {
+            if(closed)
+            {
+                throw new NMSException("Connection Closed");
+            }
+            if(!connected)
+            {
+                connected = true;
+                // now lets send the connection and see if we get an ack/nak
+                // TODO: establish a connection
+            }
+        }
+
+        public void Close()
+        {
+            Dispose();
+        }
+
+        public void HandleException(Exception e)
+        {
+            if(ExceptionListener != null && !this.closed)
+            {
+                ExceptionListener(e);
+            }
+            else
+            {
+                Tracer.Error(e);
+            }
+        }
+
+        public void HandleTransportInterrupted()
+        {
+            Tracer.Debug("Transport has been Interrupted.");
+
+            if(this.ConnectionInterruptedListener != null && !this.closed)
+            {
+                try
+                {
+                    this.ConnectionInterruptedListener();
+                }
+                catch
+                {
+                }
+            }
+        }
+
+        public void HandleTransportResumed()
+        {
+            Tracer.Debug("Transport has resumed normal operation.");
+
+            if(this.ConnectionResumedListener != null && !this.closed)
+            {
+                try
+                {
+                    this.ConnectionResumedListener();
+                }
+                catch
+                {
+                }
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/48f3e707/trunk/src/main/csharp/ConnectionFactory.cs
----------------------------------------------------------------------
diff --git a/trunk/src/main/csharp/ConnectionFactory.cs b/trunk/src/main/csharp/ConnectionFactory.cs
index 24e895d..fe9beee 100644
--- a/trunk/src/main/csharp/ConnectionFactory.cs
+++ b/trunk/src/main/csharp/ConnectionFactory.cs
@@ -19,101 +19,119 @@ using Apache.NMS.Policies;
 
 namespace Apache.NMS.MSMQ
 {
-	/// <summary>
-	/// A Factory that can estbalish NMS connections to MSMQ
-	/// </summary>
-	public class ConnectionFactory : IConnectionFactory
-	{
-		public const string DEFAULT_BROKER_URL = "msmq://localhost";
-		public const string ENV_BROKER_URL = "MSMQ_BROKER_URL";
-		private Uri brokerUri;
-		private IRedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
-
-		public static string GetDefaultBrokerUrl()
-		{
-			string answer = Environment.GetEnvironmentVariable(ENV_BROKER_URL);
-			if(answer == null)
-			{
-				answer = DEFAULT_BROKER_URL;
-			}
-			return answer;
-		}
-
-		public ConnectionFactory()
-			: this(GetDefaultBrokerUrl())
-		{
-		}
-
-		public ConnectionFactory(string brokerUri)
-			: this(brokerUri, null)
-		{
-		}
-
-		public ConnectionFactory(string brokerUri, string clientID)
-			: this(new Uri(brokerUri), clientID)
-		{
-		}
-
-		public ConnectionFactory(Uri brokerUri)
-			: this(brokerUri, null)
-		{
-		}
-
-		public ConnectionFactory(Uri brokerUri, string clientID)
-		{
-			this.brokerUri = brokerUri;
-		}
-
-		/// <summary>
-		/// Creates a new connection to MSMQ.
-		/// </summary>
-		public IConnection CreateConnection()
-		{
-			return CreateConnection(string.Empty, string.Empty, false);
-		}
-
-		/// <summary>
-		/// Creates a new connection to MSMQ.
-		/// </summary>
-		public IConnection CreateConnection(string userName, string password)
-		{
-			return CreateConnection(userName, password, false);
-		}
-
-		/// <summary>
-		/// Creates a new connection to MSMQ.
-		/// </summary>
-		public IConnection CreateConnection(string userName, string password, bool useLogging)
-		{
-			IConnection connection = new Connection();
-
-			connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as IRedeliveryPolicy;
-			return connection;
-		}
-
-		/// <summary>
-		/// Get/or set the broker Uri.
-		/// </summary>
-		public Uri BrokerUri
-		{
-			get { return brokerUri; }
-			set { brokerUri = value; }
-		}
-
-		/// <summary>
-		/// Get/or set the redelivery policy that new IConnection objects are
-		/// assigned upon creation.
-		/// </summary>
-		public IRedeliveryPolicy RedeliveryPolicy
-		{
-			get { return this.redeliveryPolicy; }
-			set
-			{
-				if(value != null)
-				{
-					this.redeliveryPolicy = value;
-				}
-			}
-		}
-	}
+    /// <summary>
+    /// A Factory that can estbalish NMS connections to MSMQ
+    /// </summary>
+    public class ConnectionFactory : IConnectionFactory
+    {
+        public const string DEFAULT_BROKER_URL = "msmq://localhost";
+        public const string ENV_BROKER_URL = "MSMQ_BROKER_URL";
+        private Uri brokerUri;
+        private IRedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
+
+        public static string GetDefaultBrokerUrl()
+        {
+            string answer = Environment.GetEnvironmentVariable(ENV_BROKER_URL);
+            if(answer == null)
+            {
+                answer = DEFAULT_BROKER_URL;
+            }
+            return answer;
+        }
+
+        public ConnectionFactory()
+            : this(GetDefaultBrokerUrl())
+        {
+        }
+
+        public ConnectionFactory(string brokerUri)
+            : this(brokerUri, null)
+        {
+        }
+
+        public ConnectionFactory(string brokerUri, string clientID)
+            : this(new Uri(brokerUri), clientID)
+        {
+        }
+
+        public ConnectionFactory(Uri brokerUri)
+            : this(brokerUri, null)
+        {
+        }
+
+        public ConnectionFactory(Uri brokerUri, string clientID)
+        {
+            this.brokerUri = brokerUri;
+        }
+
+        /// <summary>
+        /// Creates a new connection to MSMQ.
+        /// </summary>
+        public IConnection CreateConnection()
+        {
+            return CreateConnection(string.Empty, string.Empty, false);
+        }
+
+        /// <summary>
+        /// Creates a new connection to MSMQ.
+        /// </summary>
+        public IConnection CreateConnection(string userName, string password)
+        {
+            return CreateConnection(userName, password, false);
+        }
+
+        /// <summary>
+        /// Creates a new connection to MSMQ.
+        /// </summary>
+        public IConnection CreateConnection(string userName, string password, bool useLogging)
+        {
+            IConnection connection = new Connection();
+
+            connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as IRedeliveryPolicy;
+            connection.ConsumerTransformer = this.consumerTransformer;
+            connection.ProducerTransformer = this.producerTransformer;
+
+            return connection;
+        }
+
+        /// <summary>
+        /// Get/or set the broker Uri.
+        /// </summary>
+        public Uri BrokerUri
+        {
+            get { return brokerUri; }
+            set { brokerUri = value; }
+        }
+
+        /// <summary>
+        /// Get/or set the redelivery policy that new IConnection objects are
+        /// assigned upon creation.
+        /// </summary>
+        public IRedeliveryPolicy RedeliveryPolicy
+        {
+            get { return this.redeliveryPolicy; }
+            set
+            {
+                if(value != null)
+                {
+                    this.redeliveryPolicy = value;
+                }
+            }
+        }
+
+        private ConsumerTransformerDelegate consumerTransformer;
+        public ConsumerTransformerDelegate ConsumerTransformer
+        {
+            get { return this.consumerTransformer; }
+            set { this.consumerTransformer = value; }
+        }
+
+        private ProducerTransformerDelegate producerTransformer;
+        public ProducerTransformerDelegate ProducerTransformer
+        {
+            get { return this.producerTransformer; }
+            set { this.producerTransformer = value; }
+        }
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/48f3e707/trunk/src/main/csharp/MessageConsumer.cs
----------------------------------------------------------------------
diff --git a/trunk/src/main/csharp/MessageConsumer.cs b/trunk/src/main/csharp/MessageConsumer.cs
index ee57b96..708a372 100644
--- a/trunk/src/main/csharp/MessageConsumer.cs
+++ b/trunk/src/main/csharp/MessageConsumer.cs
@@ -21,211 +21,230 @@ using Apache.NMS.Util;
 
 namespace Apache.NMS.MSMQ
 {
-	/// <summary>
-	/// An object capable of receiving messages from some destination
-	/// </summary>
-	public class MessageConsumer : IMessageConsumer
-	{
-		protected TimeSpan zeroTimeout = new TimeSpan(0);
-
-		private readonly Session session;
-		private readonly AcknowledgementMode acknowledgementMode;
-		private MessageQueue messageQueue;
-		private event MessageListener listener;
-		private int listenerCount = 0;
-		private Thread asyncDeliveryThread = null;
-		private AutoResetEvent pause = new AutoResetEvent(false);
-		private Atomic<bool> asyncDelivery = new Atomic<bool>(false);
-
-		public MessageConsumer(Session session, AcknowledgementMode acknowledgementMode, MessageQueue messageQueue)
-		{
-			this.session = session;
-			this.acknowledgementMode = acknowledgementMode;
-			this.messageQueue = messageQueue;
-			if(null != this.messageQueue)
-			{
-				this.messageQueue.MessageReadPropertyFilter.SetAll();
-			}
-		}
-
-		public event MessageListener Listener
-		{
-			add
-			{
-				listener += value;
-				listenerCount++;
-				StartAsyncDelivery();
-			}
-
-			remove
-			{
-				if(listenerCount > 0)
-				{
-					listener -= value;
-					listenerCount--;
-				}
-
-				if(0 == listenerCount)
-				{
-					StopAsyncDelivery();
-				}
-			}
-		}
-
-		public IMessage Receive()
-		{
-			IMessage nmsMessage = null;
-
-			if(messageQueue != null)
-			{
-				Message message;
-
-				try
-				{
-					message = messageQueue.Receive(zeroTimeout);
-				}
-				catch
-				{
-					message = null;
-				}
-
-				if(null == message)
-				{
-					ReceiveCompletedEventHandler receiveMsg =
-							delegate(Object source, ReceiveCompletedEventArgs asyncResult) {
-								message = messageQueue.EndReceive(asyncResult.AsyncResult);
-								pause.Set();
-							};
-
-					messageQueue.ReceiveCompleted += receiveMsg;
-					messageQueue.BeginReceive();
-					pause.WaitOne();
-					messageQueue.ReceiveCompleted -= receiveMsg;
-				}
-
-				nmsMessage = ToNmsMessage(message);
-			}
-
-			return nmsMessage;
-		}
-
-		public IMessage Receive(TimeSpan timeout)
-		{
-			IMessage nmsMessage = null;
-
-			if(messageQueue != null)
-			{
-				Message message = messageQueue.Receive(timeout);
-				nmsMessage = ToNmsMessage(message);
-			}
-
-			return nmsMessage;
-		}
-
-		public IMessage ReceiveNoWait()
-		{
-			IMessage nmsMessage = null;
-
-			if(messageQueue != null)
-			{
-				Message message = messageQueue.Receive(zeroTimeout);
-				nmsMessage = ToNmsMessage(message);
-			}
-
-			return nmsMessage;
-		}
-
-		public void Dispose()
-		{
-			Close();
-		}
-
-		public void Close()
-		{
-			StopAsyncDelivery();
-			if(messageQueue != null)
-			{
-				messageQueue.Dispose();
-				messageQueue = null;
-			}
-		}
-
-		protected virtual void StopAsyncDelivery()
-		{
-			if(asyncDelivery.CompareAndSet(true, false))
-			{
-				if(null != asyncDeliveryThread)
-				{
-					Tracer.Info("Stopping async delivery thread.");
-					pause.Set();
-					if(!asyncDeliveryThread.Join(10000))
-					{
-						Tracer.Info("Aborting async delivery thread.");
-						asyncDeliveryThread.Abort();
-					}
-
-					asyncDeliveryThread = null;
-					Tracer.Info("Async delivery thread stopped.");
-				}
-			}
-		}
-
-		protected virtual void StartAsyncDelivery()
-		{
-			if(asyncDelivery.CompareAndSet(false, true))
-			{
-				asyncDeliveryThread = new Thread(new ThreadStart(DispatchLoop));
-				asyncDeliveryThread.Name = "Message Consumer Dispatch: " + messageQueue.QueueName;
-				asyncDeliveryThread.IsBackground = true;
-				asyncDeliveryThread.Start();
-			}
-		}
-
-		protected virtual void DispatchLoop()
-		{
-			Tracer.Info("Starting dispatcher thread consumer: " + this);
-			while(asyncDelivery.Value)
-			{
-				try
-				{
-					IMessage message = Receive();
-					if(asyncDelivery.Value && message != null)
-					{
-						try
-						{
-							listener(message);
-						}
-						catch(Exception e)
-						{
-							HandleAsyncException(e);
-						}
-					}
-				}
-				catch(ThreadAbortException ex)
-				{
-					Tracer.InfoFormat("Thread abort received in thread: {0} : {1}", this, ex.Message);
-					break;
-				}
-				catch(Exception ex)
-				{
-					Tracer.ErrorFormat("Exception while receiving message in thread: {0} : {1}", this, ex.Message);
-				}
-			}
-			Tracer.Info("Stopping dispatcher thread consumer: " + this);
-		}
-
-		protected virtual void HandleAsyncException(Exception e)
-		{
-			session.Connection.HandleException(e);
-		}
-
-		protected virtual IMessage ToNmsMessage(Message message)
-		{
-			if(message == null)
-			{
-				return null;
-			}
-			return session.MessageConverter.ToNmsMessage(message);
-		}
-	}
+    /// <summary>
+    /// An object capable of receiving messages from some destination
+    /// </summary>
+    public class MessageConsumer : IMessageConsumer
+    {
+        protected TimeSpan zeroTimeout = new TimeSpan(0);
+
+        private readonly Session session;
+        private readonly AcknowledgementMode acknowledgementMode;
+        private MessageQueue messageQueue;
+        private event MessageListener listener;
+        private int listenerCount = 0;
+        private Thread asyncDeliveryThread = null;
+        private AutoResetEvent pause = new AutoResetEvent(false);
+        private Atomic<bool> asyncDelivery = new Atomic<bool>(false);
+
+        private ConsumerTransformerDelegate consumerTransformer;
+        public ConsumerTransformerDelegate ConsumerTransformer
+        {
+            get { return this.consumerTransformer; }
+            set { this.consumerTransformer = value; }
+        }
+
+        public MessageConsumer(Session session, AcknowledgementMode acknowledgementMode, MessageQueue messageQueue)
+        {
+            this.session = session;
+            this.acknowledgementMode = acknowledgementMode;
+            this.messageQueue = messageQueue;
+            if(null != this.messageQueue)
+            {
+                this.messageQueue.MessageReadPropertyFilter.SetAll();
+            }
+        }
+
+        public event MessageListener Listener
+        {
+            add
+            {
+                listener += value;
+                listenerCount++;
+                StartAsyncDelivery();
+            }
+
+            remove
+            {
+                if(listenerCount > 0)
+                {
+                    listener -= value;
+                    listenerCount--;
+                }
+
+                if(0 == listenerCount)
+                {
+                    StopAsyncDelivery();
+                }
+            }
+        }
+
+        public IMessage Receive()
+        {
+            IMessage nmsMessage = null;
+
+            if(messageQueue != null)
+            {
+                Message message;
+
+                try
+                {
+                    message = messageQueue.Receive(zeroTimeout);
+                }
+                catch
+                {
+                    message = null;
+                }
+
+                if(null == message)
+                {
+                    ReceiveCompletedEventHandler receiveMsg =
+                            delegate(Object source, ReceiveCompletedEventArgs asyncResult) {
+                                message = messageQueue.EndReceive(asyncResult.AsyncResult);
+                                pause.Set();
+                            };
+
+                    messageQueue.ReceiveCompleted += receiveMsg;
+                    messageQueue.BeginReceive();
+                    pause.WaitOne();
+                    messageQueue.ReceiveCompleted -= receiveMsg;
+                }
+
+                nmsMessage = ToNmsMessage(message);
+            }
+
+            return nmsMessage;
+        }
+
+        public IMessage Receive(TimeSpan timeout)
+        {
+            IMessage nmsMessage = null;
+
+            if(messageQueue != null)
+            {
+                Message message = messageQueue.Receive(timeout);
+                nmsMessage = ToNmsMessage(message);
+            }
+
+            return nmsMessage;
+        }
+
+        public IMessage ReceiveNoWait()
+        {
+            IMessage nmsMessage = null;
+
+            if(messageQueue != null)
+            {
+                Message message = messageQueue.Receive(zeroTimeout);
+                nmsMessage = ToNmsMessage(message);
+            }
+
+            return nmsMessage;
+        }
+
+        public void Dispose()
+        {
+            Close();
+        }
+
+        public void Close()
+        {
+            StopAsyncDelivery();
+            if(messageQueue != null)
+            {
+                messageQueue.Dispose();
+                messageQueue = null;
+            }
+        }
+
+        protected virtual void StopAsyncDelivery()
+        {
+            if(asyncDelivery.CompareAndSet(true, false))
+            {
+                if(null != asyncDeliveryThread)
+                {
+                    Tracer.Info("Stopping async delivery thread.");
+                    pause.Set();
+                    if(!asyncDeliveryThread.Join(10000))
+                    {
+                        Tracer.Info("Aborting async delivery thread.");
+                        asyncDeliveryThread.Abort();
+                    }
+
+                    asyncDeliveryThread = null;
+                    Tracer.Info("Async delivery thread stopped.");
+                }
+            }
+        }
+
+        protected virtual void StartAsyncDelivery()
+        {
+            if(asyncDelivery.CompareAndSet(false, true))
+            {
+                asyncDeliveryThread = new Thread(new ThreadStart(DispatchLoop));
+                asyncDeliveryThread.Name = "Message Consumer Dispatch: " + messageQueue.QueueName;
+                asyncDeliveryThread.IsBackground = true;
+                asyncDeliveryThread.Start();
+            }
+        }
+
+        protected virtual void DispatchLoop()
+        {
+            Tracer.Info("Starting dispatcher thread consumer: " + this);
+            while(asyncDelivery.Value)
+            {
+                try
+                {
+                    IMessage message = Receive();
+                    if(asyncDelivery.Value && message != null)
+                    {
+                        try
+                        {
+                            listener(message);
+                        }
+                        catch(Exception e)
+                        {
+                            HandleAsyncException(e);
+                        }
+                    }
+                }
+                catch(ThreadAbortException ex)
+                {
+                    Tracer.InfoFormat("Thread abort received in thread: {0} : {1}", this, ex.Message);
+                    break;
+                }
+                catch(Exception ex)
+                {
+                    Tracer.ErrorFormat("Exception while receiving message in thread: {0} : {1}", this, ex.Message);
+                }
+            }
+            Tracer.Info("Stopping dispatcher thread consumer: " + this);
+        }
+
+        protected virtual void HandleAsyncException(Exception e)
+        {
+            session.Connection.HandleException(e);
+        }
+
+        protected virtual IMessage ToNmsMessage(Message message)
+        {
+            if(message == null)
+            {
+                return null;
+            }
+
+            IMessage converted = session.MessageConverter.ToNmsMessage(message);
+
+            if(this.ConsumerTransformer != null)
+            {
+                IMessage newMessage = ConsumerTransformer(this.session, this, message);
+                if(newMessage != null)
+                {
+                    converted = newMessage;
+                }
+            }
+
+            return converted;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/48f3e707/trunk/src/main/csharp/MessageProducer.cs
----------------------------------------------------------------------
diff --git a/trunk/src/main/csharp/MessageProducer.cs b/trunk/src/main/csharp/MessageProducer.cs
index 26598d3..5912c07 100644
--- a/trunk/src/main/csharp/MessageProducer.cs
+++ b/trunk/src/main/csharp/MessageProducer.cs
@@ -19,258 +19,274 @@ using System.Messaging;
 
 namespace Apache.NMS.MSMQ
 {
-	/// <summary>
-	/// An object capable of sending messages to some destination
-	/// </summary>
-	public class MessageProducer : IMessageProducer
-	{
-
-		private readonly Session session;
-		private Destination destination;
-
-		//private long messageCounter;
-		private MsgDeliveryMode deliveryMode;
-		private TimeSpan timeToLive;
-		private MsgPriority priority;
-		private bool disableMessageID;
-		private bool disableMessageTimestamp;
-
-		private MessageQueue messageQueue;
-
-		public MessageProducer(Session session, Destination destination)
-		{
-			this.session = session;
-			this.destination = destination;
-			if(destination != null)
-			{
-				messageQueue = openMessageQueue(destination);
-			}
-		}
-
-		private MessageQueue openMessageQueue(Destination dest)
-		{
-			MessageQueue rc = null;
-			try
-			{
-				if(!MessageQueue.Exists(dest.Path))
-				{
-					// create the new message queue and make it transactional
-					rc = MessageQueue.Create(dest.Path, session.Transacted);
-					this.destination.Path = rc.Path;
-				}
-				else
-				{
-					rc = new MessageQueue(dest.Path);
-					this.destination.Path = rc.Path;
-					if(!rc.CanWrite)
-					{
-						throw new NMSSecurityException("Do not have write access to: " + dest);
-					}
-				}
-			}
-			catch(Exception e)
-			{
-				if(rc != null)
-				{
-					rc.Dispose();
-				}
-
-				throw new NMSException(e.Message + ": " + dest, e);
-			}
-			return rc;
-		}
-
-		public void Send(IMessage message)
-		{
-			Send(Destination, message);
-		}
-
-		public void Send(IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
-		{
-			Send(Destination, message, deliveryMode, priority, timeToLive);
-		}
-
-		public void Send(IDestination destination, IMessage message)
-		{
-			Send(destination, message, DeliveryMode, Priority, TimeToLive);
-		}
-
-		public void Send(IDestination destination, IMessage imessage, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
-		{
-			BaseMessage message = (BaseMessage) imessage;
-			MessageQueue mq = null;
-
-			try
-			{
-				// Locate the MSMQ Queue we will be sending to
-				if(messageQueue != null)
-				{
-					if(destination.Equals(this.destination))
-					{
-						mq = messageQueue;
-					}
-					else
-					{
-						throw new NMSException("This producer can only be used to send to: " + destination);
-					}
-				}
-				else
-				{
-					mq = openMessageQueue((Destination) destination);
-				}
-
-				message.NMSDeliveryMode = deliveryMode;
-				message.NMSTimeToLive = timeToLive;
-				message.NMSPriority = priority;
-				if(!DisableMessageTimestamp)
-				{
-					message.NMSTimestamp = DateTime.UtcNow;
-				}
-
-				if(!DisableMessageID)
-				{
-					// TODO: message.NMSMessageId =
-				}
-
-				// Convert the Mesasge into a MSMQ message
-				Message msg = session.MessageConverter.ToMsmqMessage(message);
-
-				if(mq.Transactional)
-				{
-					if(session.Transacted)
-					{
-						mq.Send(msg, session.MessageQueueTransaction);
-
-					}
-					else
-					{
-						// Start our own mini transaction here to send the message.
-						using(MessageQueueTransaction transaction = new MessageQueueTransaction())
-						{
-							transaction.Begin();
-							mq.Send(msg, transaction);
-							transaction.Commit();
-						}
-					}
-				}
-				else
-				{
-					if(session.Transacted)
-					{
-						// We may want to raise an exception here since app requested
-						// a transeced NMS session, but is using a non transacted message queue
-						// For now silently ignore it.
-					}
-					mq.Send(msg);
-				}
-
-			}
-			finally
-			{
-				if(mq != null && mq != messageQueue)
-				{
-					mq.Dispose();
-				}
-			}
-		}
-
-		public void Close()
-		{
-			if(messageQueue != null)
-			{
-				messageQueue.Dispose();
-				messageQueue = null;
-			}
-		}
-
-		public void Dispose()
-		{
-			Close();
-		}
-
-		public IMessage CreateMessage()
-		{
-			return session.CreateMessage();
-		}
-
-		public ITextMessage CreateTextMessage()
-		{
-			return session.CreateTextMessage();
-		}
-
-		public ITextMessage CreateTextMessage(String text)
-		{
-			return session.CreateTextMessage(text);
-		}
-
-		public IMapMessage CreateMapMessage()
-		{
-			return session.CreateMapMessage();
-		}
-
-		public IObjectMessage CreateObjectMessage(Object body)
-		{
-			return session.CreateObjectMessage(body);
-		}
-
-		public IBytesMessage CreateBytesMessage()
-		{
-			return session.CreateBytesMessage();
-		}
-
-		public IBytesMessage CreateBytesMessage(byte[] body)
-		{
-			return session.CreateBytesMessage(body);
-		}
-
-		public IStreamMessage CreateStreamMessage()
-		{
-			return session.CreateStreamMessage();
-		}
-
-		public MsgDeliveryMode DeliveryMode
-		{
-			get { return deliveryMode; }
-			set { deliveryMode = value; }
-		}
-
-		public TimeSpan TimeToLive
-		{
-			get { return timeToLive; }
-			set { timeToLive = value; }
-		}
-
-		/// <summary>
-		/// The default timeout for network requests.
-		/// </summary>
-		public TimeSpan RequestTimeout
-		{
-			get { return NMSConstants.defaultRequestTimeout; }
-			set { }
-		}
-
-		public IDestination Destination
-		{
-			get { return destination; }
-			set { destination = (Destination) value; }
-		}
-
-		public MsgPriority Priority
-		{
-			get { return priority; }
-			set { priority = value; }
-		}
-
-		public bool DisableMessageID
-		{
-			get { return disableMessageID; }
-			set { disableMessageID = value; }
-		}
-
-		public bool DisableMessageTimestamp
-		{
-			get { return disableMessageTimestamp; }
-			set { disableMessageTimestamp = value; }
-		}
-	}
+    /// <summary>
+    /// An object capable of sending messages to some destination
+    /// </summary>
+    public class MessageProducer : IMessageProducer
+    {
+
+        private readonly Session session;
+        private Destination destination;
+
+        //private long messageCounter;
+        private MsgDeliveryMode deliveryMode;
+        private TimeSpan timeToLive;
+        private MsgPriority priority;
+        private bool disableMessageID;
+        private bool disableMessageTimestamp;
+
+        private MessageQueue messageQueue;
+
+        private ProducerTransformerDelegate producerTransformer;
+        public ProducerTransformerDelegate ProducerTransformer
+        {
+            get { return this.producerTransformer; }
+            set { this.producerTransformer = value; }
+        }
+
+        public MessageProducer(Session session, Destination destination)
+        {
+            this.session = session;
+            this.destination = destination;
+            if(destination != null)
+            {
+                messageQueue = openMessageQueue(destination);
+            }
+        }
+
+        private MessageQueue openMessageQueue(Destination dest)
+        {
+            MessageQueue rc = null;
+            try
+            {
+                if(!MessageQueue.Exists(dest.Path))
+                {
+                    // create the new message queue and make it transactional
+                    rc = MessageQueue.Create(dest.Path, session.Transacted);
+                    this.destination.Path = rc.Path;
+                }
+                else
+                {
+                    rc = new MessageQueue(dest.Path);
+                    this.destination.Path = rc.Path;
+                    if(!rc.CanWrite)
+                    {
+                        throw new NMSSecurityException("Do not have write access to: " + dest);
+                    }
+                }
+            }
+            catch(Exception e)
+            {
+                if(rc != null)
+                {
+                    rc.Dispose();
+                }
+
+                throw new NMSException(e.Message + ": " + dest, e);
+            }
+            return rc;
+        }
+
+        public void Send(IMessage message)
+        {
+            Send(Destination, message);
+        }
+
+        public void Send(IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
+        {
+            Send(Destination, message, deliveryMode, priority, timeToLive);
+        }
+
+        public void Send(IDestination destination, IMessage message)
+        {
+            Send(destination, message, DeliveryMode, Priority, TimeToLive);
+        }
+
+        public void Send(IDestination destination, IMessage imessage, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
+        {
+            BaseMessage message = (BaseMessage) imessage;
+            MessageQueue mq = null;
+
+            try
+            {
+                // Locate the MSMQ Queue we will be sending to
+                if(messageQueue != null)
+                {
+                    if(destination.Equals(this.destination))
+                    {
+                        mq = messageQueue;
+                    }
+                    else
+                    {
+                        throw new NMSException("This producer can only be used to send to: " + destination);
+                    }
+                }
+                else
+                {
+                    mq = openMessageQueue((Destination) destination);
+                }
+
+                if(this.ProducerTransformer != null)
+                {
+                    IMessage transformed = this.ProducerTransformer(this.session, this, message);
+                    if(transformed != null)
+                    {
+                        message = transformed;
+                    }
+                }
+
+                message.NMSDeliveryMode = deliveryMode;
+                message.NMSTimeToLive = timeToLive;
+                message.NMSPriority = priority;
+                if(!DisableMessageTimestamp)
+                {
+                    message.NMSTimestamp = DateTime.UtcNow;
+                }
+
+                if(!DisableMessageID)
+                {
+                    // TODO: message.NMSMessageId =
+                }
+
+                // Convert the Mesasge into a MSMQ message
+                Message msg = session.MessageConverter.ToMsmqMessage(message);
+
+                if(mq.Transactional)
+                {
+                    if(session.Transacted)
+                    {
+                        mq.Send(msg, session.MessageQueueTransaction);
+
+                    }
+                    else
+                    {
+                        // Start our own mini transaction here to send the message.
+                        using(MessageQueueTransaction transaction = new MessageQueueTransaction())
+                        {
+                            transaction.Begin();
+                            mq.Send(msg, transaction);
+                            transaction.Commit();
+                        }
+                    }
+                }
+                else
+                {
+                    if(session.Transacted)
+                    {
+                        // We may want to raise an exception here since app requested
+                        // a transeced NMS session, but is using a non transacted message queue
+                        // For now silently ignore it.
+                    }
+                    mq.Send(msg);
+                }
+
+            }
+            finally
+            {
+                if(mq != null && mq != messageQueue)
+                {
+                    mq.Dispose();
+                }
+            }
+        }
+
+        public void Close()
+        {
+            if(messageQueue != null)
+            {
+                messageQueue.Dispose();
+                messageQueue = null;
+            }
+        }
+
+        public void Dispose()
+        {
+            Close();
+        }
+
+        public IMessage CreateMessage()
+        {
+            return session.CreateMessage();
+        }
+
+        public ITextMessage CreateTextMessage()
+        {
+            return session.CreateTextMessage();
+        }
+
+        public ITextMessage CreateTextMessage(String text)
+        {
+            return session.CreateTextMessage(text);
+        }
+
+        public IMapMessage CreateMapMessage()
+        {
+            return session.CreateMapMessage();
+        }
+
+        public IObjectMessage CreateObjectMessage(Object body)
+        {
+            return session.CreateObjectMessage(body);
+        }
+
+        public IBytesMessage CreateBytesMessage()
+        {
+            return session.CreateBytesMessage();
+        }
+
+        public IBytesMessage CreateBytesMessage(byte[] body)
+        {
+            return session.CreateBytesMessage(body);
+        }
+
+        public IStreamMessage CreateStreamMessage()
+        {
+            return session.CreateStreamMessage();
+        }
+
+        public MsgDeliveryMode DeliveryMode
+        {
+            get { return deliveryMode; }
+            set { deliveryMode = value; }
+        }
+
+        public TimeSpan TimeToLive
+        {
+            get { return timeToLive; }
+            set { timeToLive = value; }
+        }
+
+        /// <summary>
+        /// The default timeout for network requests.
+        /// </summary>
+        public TimeSpan RequestTimeout
+        {
+            get { return NMSConstants.defaultRequestTimeout; }
+            set { }
+        }
+
+        public IDestination Destination
+        {
+            get { return destination; }
+            set { destination = (Destination) value; }
+        }
+
+        public MsgPriority Priority
+        {
+            get { return priority; }
+            set { priority = value; }
+        }
+
+        public bool DisableMessageID
+        {
+            get { return disableMessageID; }
+            set { disableMessageID = value; }
+        }
+
+        public bool DisableMessageTimestamp
+        {
+            get { return disableMessageTimestamp; }
+            set { disableMessageTimestamp = value; }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/48f3e707/trunk/src/main/csharp/Session.cs
----------------------------------------------------------------------
diff --git a/trunk/src/main/csharp/Session.cs b/trunk/src/main/csharp/Session.cs
index 6ee3d9e..71ee69c 100644
--- a/trunk/src/main/csharp/Session.cs
+++ b/trunk/src/main/csharp/Session.cs
@@ -19,229 +19,243 @@ using System.Messaging;
 
 namespace Apache.NMS.MSMQ
 {
-	/// <summary>
-	/// MSQM provider of ISession
-	/// </summary>
-	public class Session : ISession
-	{
-		private Connection connection;
-		private AcknowledgementMode acknowledgementMode;
-		private MessageQueueTransaction messageQueueTransaction;
-		private IMessageConverter messageConverter;
-
-		public Session(Connection connection, AcknowledgementMode acknowledgementMode)
-		{
-			this.connection = connection;
-			this.acknowledgementMode = acknowledgementMode;
-			MessageConverter = connection.MessageConverter;
-			if(this.acknowledgementMode == AcknowledgementMode.Transactional)
-			{
-				MessageQueueTransaction = new MessageQueueTransaction();
-			}
-		}
-
-		public void Dispose()
-		{
-			if(MessageQueueTransaction != null)
-			{
-				MessageQueueTransaction.Dispose();
-			}
-		}
-
-		public IMessageProducer CreateProducer()
-		{
-			return CreateProducer(null);
-		}
-
-		public IMessageProducer CreateProducer(IDestination destination)
-		{
-			return new MessageProducer(this, (Destination) destination);
-		}
-
-		public IMessageConsumer CreateConsumer(IDestination destination)
-		{
-			return CreateConsumer(destination, null);
-		}
-
-		public IMessageConsumer CreateConsumer(IDestination destination, string selector)
-		{
-			return CreateConsumer(destination, selector, false);
-		}
-
-		public IMessageConsumer CreateConsumer(IDestination destination, string selector, bool noLocal)
-		{
-			if(selector != null)
-			{
-				throw new NotSupportedException("Selectors are not supported by MSMQ");
-			}
-			MessageQueue queue = MessageConverter.ToMsmqDestination(destination);
-			return new MessageConsumer(this, acknowledgementMode, queue);
-		}
-
-		public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal)
-		{
-			throw new NotSupportedException("Durable Topic subscribers are not supported by MSMQ");
-		}
-
-		public void DeleteDurableConsumer(string name)
-		{
-			throw new NotSupportedException("Durable Topic subscribers are not supported by MSMQ");
-		}
-
-		public IQueueBrowser CreateBrowser(IQueue queue)
-		{
-			throw new NotImplementedException();
-		}
-
-		public IQueueBrowser CreateBrowser(IQueue queue, string selector)
-		{
-			throw new NotImplementedException();
-		}
-
-		public IQueue GetQueue(string name)
-		{
-			return new Queue(name);
-		}
-
-		public ITopic GetTopic(string name)
-		{
-			throw new NotSupportedException("Topics are not supported by MSMQ");
-		}
-
-		public ITemporaryQueue CreateTemporaryQueue()
-		{
-			throw new NotSupportedException("Tempoary Queues are not supported by MSMQ");
-		}
-
-		public ITemporaryTopic CreateTemporaryTopic()
-		{
-			throw new NotSupportedException("Tempoary Topics are not supported by MSMQ");
-		}
-
-		/// <summary>
-		/// Delete a destination (Queue, Topic, Temp Queue, Temp Topic).
-		/// </summary>
-		public void DeleteDestination(IDestination destination)
-		{
-			// TODO: Implement if possible.  If not possible, then change exception to NotSupportedException().
-			throw new NotImplementedException();
-		}
-
-		public IMessage CreateMessage()
-		{
-			BaseMessage answer = new BaseMessage();
-			return answer;
-		}
-
-
-		public ITextMessage CreateTextMessage()
-		{
-			TextMessage answer = new TextMessage();
-			return answer;
-		}
-
-		public ITextMessage CreateTextMessage(string text)
-		{
-			TextMessage answer = new TextMessage(text);
-			return answer;
-		}
-
-		public IMapMessage CreateMapMessage()
-		{
-			return new MapMessage();
-		}
-
-		public IBytesMessage CreateBytesMessage()
-		{
-			return new BytesMessage();
-		}
-
-		public IBytesMessage CreateBytesMessage(byte[] body)
-		{
-			BytesMessage answer = new BytesMessage();
-			answer.Content = body;
-			return answer;
-		}
-
-		public IStreamMessage CreateStreamMessage()
-		{
-			return new StreamMessage();
-		}
-
-		public IObjectMessage CreateObjectMessage(Object body)
-		{
-			ObjectMessage answer = new ObjectMessage();
-			answer.Body = body;
-			return answer;
-		}
-
-		public void Commit()
-		{
-			if(!Transacted)
-			{
-				throw new InvalidOperationException("You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: " + acknowledgementMode);
-			}
-			messageQueueTransaction.Commit();
-		}
-
-		public void Rollback()
-		{
-			if(!Transacted)
-			{
-				throw new InvalidOperationException("You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: " + acknowledgementMode);
-			}
-			messageQueueTransaction.Abort();
-		}
-
-		// Properties
-		public Connection Connection
-		{
-			get { return connection; }
-		}
-
-		/// <summary>
-		/// The default timeout for network requests.
-		/// </summary>
-		public TimeSpan RequestTimeout
-		{
-			get { return NMSConstants.defaultRequestTimeout; }
-			set { }
-		}
-
-		public bool Transacted
-		{
-			get { return acknowledgementMode == AcknowledgementMode.Transactional; }
-		}
-
-		public AcknowledgementMode AcknowledgementMode
-		{
-			get { throw new NotImplementedException(); }
-		}
-
-		public MessageQueueTransaction MessageQueueTransaction
-		{
-			get
-			{
-				if(null != messageQueueTransaction
-					&& messageQueueTransaction.Status != MessageQueueTransactionStatus.Pending)
-				{
-					messageQueueTransaction.Begin();
-				}
-
-				return messageQueueTransaction;
-			}
-			set { messageQueueTransaction = value; }
-		}
-
-		public IMessageConverter MessageConverter
-		{
-			get { return messageConverter; }
-			set { messageConverter = value; }
-		}
-
-		public void Close()
-		{
-			Dispose();
-		}
-	}
+    /// <summary>
+    /// MSQM provider of ISession
+    /// </summary>
+    public class Session : ISession
+    {
+        private Connection connection;
+        private AcknowledgementMode acknowledgementMode;
+        private MessageQueueTransaction messageQueueTransaction;
+        private IMessageConverter messageConverter;
+
+        public Session(Connection connection, AcknowledgementMode acknowledgementMode)
+        {
+            this.connection = connection;
+            this.acknowledgementMode = acknowledgementMode;
+            MessageConverter = connection.MessageConverter;
+            if(this.acknowledgementMode == AcknowledgementMode.Transactional)
+            {
+                MessageQueueTransaction = new MessageQueueTransaction();
+            }
+        }
+
+        public void Dispose()
+        {
+            if(MessageQueueTransaction != null)
+            {
+                MessageQueueTransaction.Dispose();
+            }
+        }
+
+        public IMessageProducer CreateProducer()
+        {
+            return CreateProducer(null);
+        }
+
+        public IMessageProducer CreateProducer(IDestination destination)
+        {
+            return new MessageProducer(this, (Destination) destination);
+        }
+
+        public IMessageConsumer CreateConsumer(IDestination destination)
+        {
+            return CreateConsumer(destination, null);
+        }
+
+        public IMessageConsumer CreateConsumer(IDestination destination, string selector)
+        {
+            return CreateConsumer(destination, selector, false);
+        }
+
+        public IMessageConsumer CreateConsumer(IDestination destination, string selector, bool noLocal)
+        {
+            if(selector != null)
+            {
+                throw new NotSupportedException("Selectors are not supported by MSMQ");
+            }
+            MessageQueue queue = MessageConverter.ToMsmqDestination(destination);
+            return new MessageConsumer(this, acknowledgementMode, queue);
+        }
+
+        public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal)
+        {
+            throw new NotSupportedException("Durable Topic subscribers are not supported by MSMQ");
+        }
+
+        public void DeleteDurableConsumer(string name)
+        {
+            throw new NotSupportedException("Durable Topic subscribers are not supported by MSMQ");
+        }
+
+        public IQueueBrowser CreateBrowser(IQueue queue)
+        {
+            throw new NotImplementedException();
+        }
+
+        public IQueueBrowser CreateBrowser(IQueue queue, string selector)
+        {
+            throw new NotImplementedException();
+        }
+
+        public IQueue GetQueue(string name)
+        {
+            return new Queue(name);
+        }
+
+        public ITopic GetTopic(string name)
+        {
+            throw new NotSupportedException("Topics are not supported by MSMQ");
+        }
+
+        public ITemporaryQueue CreateTemporaryQueue()
+        {
+            throw new NotSupportedException("Tempoary Queues are not supported by MSMQ");
+        }
+
+        public ITemporaryTopic CreateTemporaryTopic()
+        {
+            throw new NotSupportedException("Tempoary Topics are not supported by MSMQ");
+        }
+
+        /// <summary>
+        /// Delete a destination (Queue, Topic, Temp Queue, Temp Topic).
+        /// </summary>
+        public void DeleteDestination(IDestination destination)
+        {
+            // TODO: Implement if possible.  If not possible, then change exception to NotSupportedException().
+            throw new NotImplementedException();
+        }
+
+        public IMessage CreateMessage()
+        {
+            BaseMessage answer = new BaseMessage();
+            return answer;
+        }
+
+
+        public ITextMessage CreateTextMessage()
+        {
+            TextMessage answer = new TextMessage();
+            return answer;
+        }
+
+        public ITextMessage CreateTextMessage(string text)
+        {
+            TextMessage answer = new TextMessage(text);
+            return answer;
+        }
+
+        public IMapMessage CreateMapMessage()
+        {
+            return new MapMessage();
+        }
+
+        public IBytesMessage CreateBytesMessage()
+        {
+            return new BytesMessage();
+        }
+
+        public IBytesMessage CreateBytesMessage(byte[] body)
+        {
+            BytesMessage answer = new BytesMessage();
+            answer.Content = body;
+            return answer;
+        }
+
+        public IStreamMessage CreateStreamMessage()
+        {
+            return new StreamMessage();
+        }
+
+        public IObjectMessage CreateObjectMessage(Object body)
+        {
+            ObjectMessage answer = new ObjectMessage();
+            answer.Body = body;
+            return answer;
+        }
+
+        public void Commit()
+        {
+            if(!Transacted)
+            {
+                throw new InvalidOperationException("You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: " + acknowledgementMode);
+            }
+            messageQueueTransaction.Commit();
+        }
+
+        public void Rollback()
+        {
+            if(!Transacted)
+            {
+                throw new InvalidOperationException("You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: " + acknowledgementMode);
+            }
+            messageQueueTransaction.Abort();
+        }
+
+        // Properties
+        public Connection Connection
+        {
+            get { return connection; }
+        }
+
+        /// <summary>
+        /// The default timeout for network requests.
+        /// </summary>
+        public TimeSpan RequestTimeout
+        {
+            get { return NMSConstants.defaultRequestTimeout; }
+            set { }
+        }
+
+        public bool Transacted
+        {
+            get { return acknowledgementMode == AcknowledgementMode.Transactional; }
+        }
+
+        public AcknowledgementMode AcknowledgementMode
+        {
+            get { throw new NotImplementedException(); }
+        }
+
+        public MessageQueueTransaction MessageQueueTransaction
+        {
+            get
+            {
+                if(null != messageQueueTransaction
+                    && messageQueueTransaction.Status != MessageQueueTransactionStatus.Pending)
+                {
+                    messageQueueTransaction.Begin();
+                }
+
+                return messageQueueTransaction;
+            }
+            set { messageQueueTransaction = value; }
+        }
+
+        public IMessageConverter MessageConverter
+        {
+            get { return messageConverter; }
+            set { messageConverter = value; }
+        }
+
+        private ConsumerTransformerDelegate consumerTransformer;
+        public ConsumerTransformerDelegate ConsumerTransformer
+        {
+            get { return this.consumerTransformer; }
+            set { this.consumerTransformer = value; }
+        }
+
+        private ProducerTransformerDelegate producerTransformer;
+        public ProducerTransformerDelegate ProducerTransformer
+        {
+            get { return this.producerTransformer; }
+            set { this.producerTransformer = value; }
+        }
+
+        public void Close()
+        {
+            Dispose();
+        }
+    }
 }


Mime
View raw message