activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r990623 [1/2] - in /activemq/activemq-dotnet: Apache.NMS.ActiveMQ/trunk/src/main/csharp/ Apache.NMS.MSMQ/trunk/src/main/csharp/ Apache.NMS.Stomp/trunk/ Apache.NMS.Stomp/trunk/src/main/csharp/ Apache.NMS.Stomp/trunk/src/test/csharp/ Apache.N...
Date Sun, 29 Aug 2010 18:44:39 GMT
Author: tabish
Date: Sun Aug 29 18:44:38 2010
New Revision: 990623

URL: http://svn.apache.org/viewvc?rev=990623&view=rev
Log:
fix for: https://issues.apache.org/activemq/browse/AMQNET-271

Added:
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/MessageTransformerTest.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/MessageTransformerTest.cs   (with props)
Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Connection.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/ConnectionFactory.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/MessageProducer.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Session.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/ConnectionFactory.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageProducer.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj
    activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/IConnection.cs
    activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/IConnectionFactory.cs
    activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/IMessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/IMessageProducer.cs
    activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/ISession.cs
    activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/NMSConnectionFactory.cs
    activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/Util/MessageTransformation.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=990623&r1=990622&r2=990623&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs Sun Aug 29 18:44:38 2010
@@ -33,7 +33,7 @@ namespace Apache.NMS.ActiveMQ
     public class Connection : IConnection
     {
         private static readonly IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
-
+				
         // Uri configurable options.
         private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
         private bool asyncSend = false;
@@ -71,7 +71,7 @@ namespace Apache.NMS.ActiveMQ
         private ICompressionPolicy compressionPolicy = new CompressionPolicy();
         private IdGenerator clientIdGenerator;
         private volatile CountDownLatch transportInterruptionProcessingComplete;
-        private MessageTransformation messageTransformation = null;
+        private readonly MessageTransformation messageTransformation;
 
         public Connection(Uri connectionUri, ITransport transport, IdGenerator clientIdGenerator)
         {
@@ -116,8 +116,22 @@ namespace Apache.NMS.ActiveMQ
         /// </summary>
         public event ConnectionResumedListener ConnectionResumedListener;
 
-        #region Properties
+        private ConsumerTransformerDelegate consumerTransformer;
+        public ConsumerTransformerDelegate ConsumerTransformer
+        {
+            get { return this.consumerTransformer; }
+            set { this.consumerTransformer = value; }
+        }
 
+        private ProducerTransformerDelegate producerTransformer;
+        public ProducerTransformerDelegate ProducerTransformer
+        {
+            get { return this.producerTransformer; }
+            set { this.producerTransformer = value; }
+        }
+
+        #region Properties
+		
         public String UserName
         {
             get { return this.info.UserName; }
@@ -430,6 +444,9 @@ namespace Apache.NMS.ActiveMQ
 			options = URISupport.GetProperties(options, "session.");
             URISupport.SetProperties(session, options);
 
+			session.ConsumerTransformer = this.ConsumerTransformer;
+			session.ProducerTransformer = this.ProducerTransformer;
+
             if(IsStarted)
             {
                 session.Start();

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs?rev=990623&r1=990622&r2=990623&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs Sun Aug 29 18:44:38 2010
@@ -352,6 +352,20 @@ namespace Apache.NMS.ActiveMQ
 			}
 		}
 
+		private ConsumerTransformerDelegate consumerTransformer;
+		public ConsumerTransformerDelegate ConsumerTransformer
+		{
+			get { return this.consumerTransformer; }
+			set { this.consumerTransformer = value; }
+		}
+
+		private ProducerTransformerDelegate producerTransformer;
+		public ProducerTransformerDelegate ProducerTransformer
+		{
+			get { return this.producerTransformer; }
+			set { this.producerTransformer = value; }
+		}
+		
         #endregion
 
         protected virtual void ConfigureConnection(Connection connection)
@@ -370,6 +384,8 @@ namespace Apache.NMS.ActiveMQ
             connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as IRedeliveryPolicy;
             connection.PrefetchPolicy = this.prefetchPolicy.Clone() as PrefetchPolicy;
             connection.CompressionPolicy = this.compressionPolicy.Clone() as ICompressionPolicy;
+			connection.ConsumerTransformer = this.consumerTransformer;
+			connection.ProducerTransformer = this.producerTransformer;
         }
 
 		protected static void ExceptionHandler(Exception ex)

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=990623&r1=990622&r2=990623&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs Sun Aug 29 18:44:38 2010
@@ -40,6 +40,7 @@ namespace Apache.NMS.ActiveMQ
 	/// </summary>
 	public class MessageConsumer : IMessageConsumer, IDispatcher
 	{
+        private readonly MessageTransformation messageTransformation;
 		private readonly MessageDispatchChannel unconsumedMessages;
 		private readonly LinkedList<MessageDispatch> dispatchedMessages = new LinkedList<MessageDispatch>();
 		private readonly ConsumerInfo info;
@@ -80,6 +81,7 @@ namespace Apache.NMS.ActiveMQ
             
 			this.session = session;
             this.redeliveryPolicy = this.session.Connection.RedeliveryPolicy;
+            this.messageTransformation = this.session.Connection.MessageTransformation;
 
             if(session.Connection.MessagePrioritySupported)
             {
@@ -169,11 +171,18 @@ namespace Apache.NMS.ActiveMQ
 			get { return ignoreExpiration; }
 			set { ignoreExpiration = value; }
 		}
-
+		
 		#endregion
 
 		#region IMessageConsumer Members
 
+        private ConsumerTransformerDelegate consumerTransformer;
+        public ConsumerTransformerDelegate ConsumerTransformer
+        {
+            get { return this.consumerTransformer; }
+            set { this.consumerTransformer = value; }
+        }
+
 		public event MessageListener Listener
 		{
 			add
@@ -1064,6 +1073,15 @@ namespace Apache.NMS.ActiveMQ
 		{
 			ActiveMQMessage message = dispatch.Message.Clone() as ActiveMQMessage;
 
+			if(this.ConsumerTransformer != null)
+			{
+				IMessage newMessage = ConsumerTransformer(this.session, this, message);
+				if(newMessage != null)
+				{
+					message = this.messageTransformation.TransformMessage<ActiveMQMessage>(newMessage);
+				}
+			}
+			
 			message.Connection = this.session.Connection;
 
 			if(IsClientAcknowledge)

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs?rev=990623&r1=990622&r2=990623&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs Sun Aug 29 18:44:38 2010
@@ -201,6 +201,15 @@ namespace Apache.NMS.ActiveMQ
             {
                 throw new NotSupportedException("This producer can only send messages to: " + this.info.Destination.PhysicalName);
             }
+			
+			if(this.ProducerTransformer != null)
+			{
+				IMessage transformed = this.ProducerTransformer(this.session, this, message);
+				if(transformed != null)
+				{
+					message = transformed;
+				}
+			}
 
             ActiveMQMessage activeMessage = this.messageTransformation.TransformMessage<ActiveMQMessage>(message);
 
@@ -287,6 +296,13 @@ namespace Apache.NMS.ActiveMQ
             get { return disableMessageTimestamp; }
             set { this.disableMessageTimestamp = value; }
         }
+		
+        private ProducerTransformerDelegate producerTransformer;
+        public ProducerTransformerDelegate ProducerTransformer
+        {
+            get { return this.producerTransformer; }
+            set { this.producerTransformer = value; }
+        }
 
         public IMessage CreateMessage()
         {
@@ -328,7 +344,7 @@ namespace Apache.NMS.ActiveMQ
             return session.CreateStreamMessage();
         }
 
-        public void OnProducerAck(ProducerAck ack)
+        internal void OnProducerAck(ProducerAck ack)
         {
             Tracer.Debug("Received ProducerAck for Message of Size = {" + ack.Size + "}" );
 

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs?rev=990623&r1=990622&r2=990623&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs Sun Aug 29 18:44:38 2010
@@ -217,6 +217,20 @@ namespace Apache.NMS.ActiveMQ
             set { this.closeStopTimeout = TimeSpan.FromMilliseconds(value); }
         }
 
+        private ConsumerTransformerDelegate consumerTransformer;
+        public ConsumerTransformerDelegate ConsumerTransformer
+        {
+            get { return this.consumerTransformer; }
+            set { this.consumerTransformer = value; }
+        }
+
+        private ProducerTransformerDelegate producerTransformer;
+        public ProducerTransformerDelegate ProducerTransformer
+        {
+            get { return this.producerTransformer; }
+            set { this.producerTransformer = value; }
+        }
+
         #endregion
 
         #region ISession Members
@@ -368,6 +382,8 @@ namespace Apache.NMS.ActiveMQ
 
                 producer = new MessageProducer(this, GetNextProducerId(), dest, this.RequestTimeout);
 
+                producer.ProducerTransformer = this.ProducerTransformer;
+
                 this.AddProducer(producer);
                 this.Connection.Oneway(producer.ProducerInfo);
             }
@@ -422,6 +438,8 @@ namespace Apache.NMS.ActiveMQ
                                                this.connection.PrefetchPolicy.MaximumPendingMessageLimit,
                                                noLocal, false, this.connection.DispatchAsync);
 				
+				consumer.ConsumerTransformer = this.ConsumerTransformer;
+
                 this.AddConsumer(consumer);
                 this.Connection.SyncRequest(consumer.ConsumerInfo);
 
@@ -461,6 +479,8 @@ namespace Apache.NMS.ActiveMQ
                                                this.connection.PrefetchPolicy.MaximumPendingMessageLimit,
                                                noLocal, false, this.connection.DispatchAsync);
 
+                consumer.ConsumerTransformer = this.ConsumerTransformer;
+			
                 this.AddConsumer(consumer);
                 this.Connection.SyncRequest(consumer.ConsumerInfo);
 

Modified: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Connection.cs?rev=990623&r1=990622&r2=990623&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Connection.cs Sun Aug 29 18:44:38 2010
@@ -19,198 +19,212 @@ using System;
 
 namespace Apache.NMS.MSMQ
 {
-	/// <summary>
-	/// Represents a NMS connection MSMQ.  Since the underlying MSMQ APIs are actually
-	/// connectionless, NMS connection in the MSMQ case are not expensive operations.
-	/// </summary>
-	///
-	public class Connection : IConnection
-	{
-		private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
-		private IMessageConverter messageConverter = new DefaultMessageConverter();
-
-		private IRedeliveryPolicy redeliveryPolicy;
-		private ConnectionMetaData metaData = null;
-		private bool connected;
-		private bool closed;
-		private string clientId;
-
-		/// <summary>
-		/// Starts message delivery for this connection.
-		/// </summary>
-		public void Start()
-		{
-			CheckConnected();
-		}
-
-		/// <summary>
-		/// This property determines if the asynchronous message delivery of incoming
-		/// messages has been started for this connection.
-		/// </summary>
-		public bool IsStarted
-		{
-			get { return true; }
-		}
-
-		/// <summary>
-		/// Stop message delivery for this connection.
-		/// </summary>
-		public void Stop()
-		{
-			CheckConnected();
-		}
-
-		/// <summary>
-		/// Creates a new session to work on this connection
-		/// </summary>
-		public ISession CreateSession()
-		{
-			return CreateSession(acknowledgementMode);
-		}
-
-		/// <summary>
-		/// Creates a new session to work on this connection
-		/// </summary>
-		public ISession CreateSession(AcknowledgementMode mode)
-		{
-			CheckConnected();
-			return new Session(this, mode);
-		}
-
-		public void Dispose()
-		{
-			closed = true;
-		}
-
-		/// <summary>
-		/// The default timeout for network requests.
-		/// </summary>
-		public TimeSpan RequestTimeout
-		{
-			get { return NMSConstants.defaultRequestTimeout; }
-			set { }
-		}
-
-		public AcknowledgementMode AcknowledgementMode
-		{
-			get { return acknowledgementMode; }
-			set { acknowledgementMode = value; }
-		}
-
-		public IMessageConverter MessageConverter
-		{
-			get { return messageConverter; }
-			set { messageConverter = value; }
-		}
-
-		public string ClientId
-		{
-			get { return clientId; }
-			set
-			{
-				if(connected)
-				{
-					throw new NMSException("You cannot change the ClientId once the Connection is connected");
-				}
-				clientId = value;
-			}
-		}
-
-		/// <summary>
-		/// Get/or set the redelivery policy for this connection.
-		/// </summary>
-		public IRedeliveryPolicy RedeliveryPolicy
-		{
-			get { return this.redeliveryPolicy; }
-			set { this.redeliveryPolicy = value; }
-		}
-
-		/// <summary>
-		/// Gets the Meta Data for the NMS Connection instance.
-		/// </summary>
-		public IConnectionMetaData MetaData
-		{
-			get { return this.metaData ?? (this.metaData = new ConnectionMetaData()); }
-		}
-
-		/// <summary>
-		/// A delegate that can receive transport level exceptions.
-		/// </summary>
-		public event ExceptionListener ExceptionListener;
-
-		/// <summary>
-		/// An asynchronous listener that is notified when a Fault tolerant connection
-		/// has been interrupted.
-		/// </summary>
-		public event ConnectionInterruptedListener ConnectionInterruptedListener;
-
-		/// <summary>
-		/// An asynchronous listener that is notified when a Fault tolerant connection
-		/// has been resumed.
-		/// </summary>
-		public event ConnectionResumedListener ConnectionResumedListener;
-
-		protected void CheckConnected()
-		{
-			if(closed)
-			{
-				throw new NMSException("Connection Closed");
-			}
-			if(!connected)
-			{
-				connected = true;
-				// now lets send the connection and see if we get an ack/nak
-				// TODO: establish a connection
-			}
-		}
-
-		public void Close()
-		{
-			Dispose();
-		}
-
-		public void HandleException(Exception e)
-		{
-			if(ExceptionListener != null && !this.closed)
-			{
-				ExceptionListener(e);
-			}
-			else
-			{
-				Tracer.Error(e);
-			}
-		}
-
-		public void HandleTransportInterrupted()
-		{
-			Tracer.Debug("Transport has been Interrupted.");
-
-			if(this.ConnectionInterruptedListener != null && !this.closed)
-			{
-				try
-				{
-					this.ConnectionInterruptedListener();
-				}
-				catch
-				{
-				}
-			}
-		}
-
-		public void HandleTransportResumed()
-		{
-			Tracer.Debug("Transport has resumed normal operation.");
-
-			if(this.ConnectionResumedListener != null && !this.closed)
-			{
-				try
-				{
-					this.ConnectionResumedListener();
-				}
-				catch
-				{
-				}
-			}
-		}
-	}
+    /// <summary>
+    /// Represents a NMS connection MSMQ.  Since the underlying MSMQ APIs are actually
+    /// connectionless, NMS connection in the MSMQ case are not expensive operations.
+    /// </summary>
+    ///
+    public class Connection : IConnection
+    {
+        private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
+        private IMessageConverter messageConverter = new DefaultMessageConverter();
+
+        private IRedeliveryPolicy redeliveryPolicy;
+        private ConnectionMetaData metaData = null;
+        private bool connected;
+        private bool closed;
+        private string clientId;
+
+        /// <summary>
+        /// Starts message delivery for this connection.
+        /// </summary>
+        public void Start()
+        {
+            CheckConnected();
+        }
+
+        /// <summary>
+        /// This property determines if the asynchronous message delivery of incoming
+        /// messages has been started for this connection.
+        /// </summary>
+        public bool IsStarted
+        {
+            get { return true; }
+        }
+
+        /// <summary>
+        /// Stop message delivery for this connection.
+        /// </summary>
+        public void Stop()
+        {
+            CheckConnected();
+        }
+
+        /// <summary>
+        /// Creates a new session to work on this connection
+        /// </summary>
+        public ISession CreateSession()
+        {
+            return CreateSession(acknowledgementMode);
+        }
+
+        /// <summary>
+        /// Creates a new session to work on this connection
+        /// </summary>
+        public ISession CreateSession(AcknowledgementMode mode)
+        {
+            CheckConnected();
+            return new Session(this, mode);
+        }
+
+        public void Dispose()
+        {
+            closed = true;
+        }
+
+        /// <summary>
+        /// The default timeout for network requests.
+        /// </summary>
+        public TimeSpan RequestTimeout
+        {
+            get { return NMSConstants.defaultRequestTimeout; }
+            set { }
+        }
+
+        public AcknowledgementMode AcknowledgementMode
+        {
+            get { return acknowledgementMode; }
+            set { acknowledgementMode = value; }
+        }
+
+        public IMessageConverter MessageConverter
+        {
+            get { return messageConverter; }
+            set { messageConverter = value; }
+        }
+
+        public string ClientId
+        {
+            get { return clientId; }
+            set
+            {
+                if(connected)
+                {
+                    throw new NMSException("You cannot change the ClientId once the Connection is connected");
+                }
+                clientId = value;
+            }
+        }
+
+        /// <summary>
+        /// Get/or set the redelivery policy for this connection.
+        /// </summary>
+        public IRedeliveryPolicy RedeliveryPolicy
+        {
+            get { return this.redeliveryPolicy; }
+            set { this.redeliveryPolicy = value; }
+        }
+
+        private ConsumerTransformerDelegate consumerTransformer;
+        public ConsumerTransformerDelegate ConsumerTransformer
+        {
+            get { return this.consumerTransformer; }
+            set { this.consumerTransformer = value; }
+        }
+
+        private ProducerTransformerDelegate producerTransformer;
+        public ProducerTransformerDelegate ProducerTransformer
+        {
+            get { return this.producerTransformer; }
+            set { this.producerTransformer = value; }
+        }
+
+        /// <summary>
+        /// Gets the Meta Data for the NMS Connection instance.
+        /// </summary>
+        public IConnectionMetaData MetaData
+        {
+            get { return this.metaData ?? (this.metaData = new ConnectionMetaData()); }
+        }
+
+        /// <summary>
+        /// A delegate that can receive transport level exceptions.
+        /// </summary>
+        public event ExceptionListener ExceptionListener;
+
+        /// <summary>
+        /// An asynchronous listener that is notified when a Fault tolerant connection
+        /// has been interrupted.
+        /// </summary>
+        public event ConnectionInterruptedListener ConnectionInterruptedListener;
+
+        /// <summary>
+        /// An asynchronous listener that is notified when a Fault tolerant connection
+        /// has been resumed.
+        /// </summary>
+        public event ConnectionResumedListener ConnectionResumedListener;
+
+        protected void CheckConnected()
+        {
+            if(closed)
+            {
+                throw new NMSException("Connection Closed");
+            }
+            if(!connected)
+            {
+                connected = true;
+                // now lets send the connection and see if we get an ack/nak
+                // TODO: establish a connection
+            }
+        }
+
+        public void Close()
+        {
+            Dispose();
+        }
+
+        public void HandleException(Exception e)
+        {
+            if(ExceptionListener != null && !this.closed)
+            {
+                ExceptionListener(e);
+            }
+            else
+            {
+                Tracer.Error(e);
+            }
+        }
+
+        public void HandleTransportInterrupted()
+        {
+            Tracer.Debug("Transport has been Interrupted.");
+
+            if(this.ConnectionInterruptedListener != null && !this.closed)
+            {
+                try
+                {
+                    this.ConnectionInterruptedListener();
+                }
+                catch
+                {
+                }
+            }
+        }
+
+        public void HandleTransportResumed()
+        {
+            Tracer.Debug("Transport has resumed normal operation.");
+
+            if(this.ConnectionResumedListener != null && !this.closed)
+            {
+                try
+                {
+                    this.ConnectionResumedListener();
+                }
+                catch
+                {
+                }
+            }
+        }
+    }
 }

Modified: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/ConnectionFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/ConnectionFactory.cs?rev=990623&r1=990622&r2=990623&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/ConnectionFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/ConnectionFactory.cs Sun Aug 29 18:44:38 2010
@@ -19,101 +19,119 @@ using Apache.NMS.Policies;
 
 namespace Apache.NMS.MSMQ
 {
-	/// <summary>
-	/// A Factory that can estbalish NMS connections to MSMQ
-	/// </summary>
-	public class ConnectionFactory : IConnectionFactory
-	{
-		public const string DEFAULT_BROKER_URL = "msmq://localhost";
-		public const string ENV_BROKER_URL = "MSMQ_BROKER_URL";
-		private Uri brokerUri;
-		private IRedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
-
-		public static string GetDefaultBrokerUrl()
-		{
-			string answer = Environment.GetEnvironmentVariable(ENV_BROKER_URL);
-			if(answer == null)
-			{
-				answer = DEFAULT_BROKER_URL;
-			}
-			return answer;
-		}
-
-		public ConnectionFactory()
-			: this(GetDefaultBrokerUrl())
-		{
-		}
-
-		public ConnectionFactory(string brokerUri)
-			: this(brokerUri, null)
-		{
-		}
-
-		public ConnectionFactory(string brokerUri, string clientID)
-			: this(new Uri(brokerUri), clientID)
-		{
-		}
-
-		public ConnectionFactory(Uri brokerUri)
-			: this(brokerUri, null)
-		{
-		}
-
-		public ConnectionFactory(Uri brokerUri, string clientID)
-		{
-			this.brokerUri = brokerUri;
-		}
-
-		/// <summary>
-		/// Creates a new connection to MSMQ.
-		/// </summary>
-		public IConnection CreateConnection()
-		{
-			return CreateConnection(string.Empty, string.Empty, false);
-		}
-
-		/// <summary>
-		/// Creates a new connection to MSMQ.
-		/// </summary>
-		public IConnection CreateConnection(string userName, string password)
-		{
-			return CreateConnection(userName, password, false);
-		}
-
-		/// <summary>
-		/// Creates a new connection to MSMQ.
-		/// </summary>
-		public IConnection CreateConnection(string userName, string password, bool useLogging)
-		{
-			IConnection connection = new Connection();
-
-			connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as IRedeliveryPolicy;
-			return connection;
-		}
-
-		/// <summary>
-		/// Get/or set the broker Uri.
-		/// </summary>
-		public Uri BrokerUri
-		{
-			get { return brokerUri; }
-			set { brokerUri = value; }
-		}
-
-		/// <summary>
-		/// Get/or set the redelivery policy that new IConnection objects are
-		/// assigned upon creation.
-		/// </summary>
-		public IRedeliveryPolicy RedeliveryPolicy
-		{
-			get { return this.redeliveryPolicy; }
-			set
-			{
-				if(value != null)
-				{
-					this.redeliveryPolicy = value;
-				}
-			}
-		}
-	}
+    /// <summary>
+    /// A Factory that can estbalish NMS connections to MSMQ
+    /// </summary>
+    public class ConnectionFactory : IConnectionFactory
+    {
+        public const string DEFAULT_BROKER_URL = "msmq://localhost";
+        public const string ENV_BROKER_URL = "MSMQ_BROKER_URL";
+        private Uri brokerUri;
+        private IRedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
+
+        public static string GetDefaultBrokerUrl()
+        {
+            string answer = Environment.GetEnvironmentVariable(ENV_BROKER_URL);
+            if(answer == null)
+            {
+                answer = DEFAULT_BROKER_URL;
+            }
+            return answer;
+        }
+
+        public ConnectionFactory()
+            : this(GetDefaultBrokerUrl())
+        {
+        }
+
+        public ConnectionFactory(string brokerUri)
+            : this(brokerUri, null)
+        {
+        }
+
+        public ConnectionFactory(string brokerUri, string clientID)
+            : this(new Uri(brokerUri), clientID)
+        {
+        }
+
+        public ConnectionFactory(Uri brokerUri)
+            : this(brokerUri, null)
+        {
+        }
+
+        public ConnectionFactory(Uri brokerUri, string clientID)
+        {
+            this.brokerUri = brokerUri;
+        }
+
+        /// <summary>
+        /// Creates a new connection to MSMQ.
+        /// </summary>
+        public IConnection CreateConnection()
+        {
+            return CreateConnection(string.Empty, string.Empty, false);
+        }
+
+        /// <summary>
+        /// Creates a new connection to MSMQ.
+        /// </summary>
+        public IConnection CreateConnection(string userName, string password)
+        {
+            return CreateConnection(userName, password, false);
+        }
+
+        /// <summary>
+        /// Creates a new connection to MSMQ.
+        /// </summary>
+        public IConnection CreateConnection(string userName, string password, bool useLogging)
+        {
+            IConnection connection = new Connection();
+
+            connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as IRedeliveryPolicy;
+            connection.ConsumerTransformer = this.consumerTransformer;
+            connection.ProducerTransformer = this.producerTransformer;
+
+            return connection;
+        }
+
+        /// <summary>
+        /// Get/or set the broker Uri.
+        /// </summary>
+        public Uri BrokerUri
+        {
+            get { return brokerUri; }
+            set { brokerUri = value; }
+        }
+
+        /// <summary>
+        /// Get/or set the redelivery policy that new IConnection objects are
+        /// assigned upon creation.
+        /// </summary>
+        public IRedeliveryPolicy RedeliveryPolicy
+        {
+            get { return this.redeliveryPolicy; }
+            set
+            {
+                if(value != null)
+                {
+                    this.redeliveryPolicy = value;
+                }
+            }
+        }
+
+        private ConsumerTransformerDelegate consumerTransformer;
+        public ConsumerTransformerDelegate ConsumerTransformer
+        {
+            get { return this.consumerTransformer; }
+            set { this.consumerTransformer = value; }
+        }
+
+        private ProducerTransformerDelegate producerTransformer;
+        public ProducerTransformerDelegate ProducerTransformer
+        {
+            get { return this.producerTransformer; }
+            set { this.producerTransformer = value; }
+        }
+
+    }
 }

Modified: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=990623&r1=990622&r2=990623&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/MessageConsumer.cs Sun Aug 29 18:44:38 2010
@@ -21,211 +21,230 @@ using Apache.NMS.Util;
 
 namespace Apache.NMS.MSMQ
 {
-	/// <summary>
-	/// An object capable of receiving messages from some destination
-	/// </summary>
-	public class MessageConsumer : IMessageConsumer
-	{
-		protected TimeSpan zeroTimeout = new TimeSpan(0);
-
-		private readonly Session session;
-		private readonly AcknowledgementMode acknowledgementMode;
-		private MessageQueue messageQueue;
-		private event MessageListener listener;
-		private int listenerCount = 0;
-		private Thread asyncDeliveryThread = null;
-		private AutoResetEvent pause = new AutoResetEvent(false);
-		private Atomic<bool> asyncDelivery = new Atomic<bool>(false);
-
-		public MessageConsumer(Session session, AcknowledgementMode acknowledgementMode, MessageQueue messageQueue)
-		{
-			this.session = session;
-			this.acknowledgementMode = acknowledgementMode;
-			this.messageQueue = messageQueue;
-			if(null != this.messageQueue)
-			{
-				this.messageQueue.MessageReadPropertyFilter.SetAll();
-			}
-		}
-
-		public event MessageListener Listener
-		{
-			add
-			{
-				listener += value;
-				listenerCount++;
-				StartAsyncDelivery();
-			}
-
-			remove
-			{
-				if(listenerCount > 0)
-				{
-					listener -= value;
-					listenerCount--;
-				}
-
-				if(0 == listenerCount)
-				{
-					StopAsyncDelivery();
-				}
-			}
-		}
-
-		public IMessage Receive()
-		{
-			IMessage nmsMessage = null;
-
-			if(messageQueue != null)
-			{
-				Message message;
-
-				try
-				{
-					message = messageQueue.Receive(zeroTimeout);
-				}
-				catch
-				{
-					message = null;
-				}
-
-				if(null == message)
-				{
-					ReceiveCompletedEventHandler receiveMsg =
-							delegate(Object source, ReceiveCompletedEventArgs asyncResult) {
-								message = messageQueue.EndReceive(asyncResult.AsyncResult);
-								pause.Set();
-							};
-
-					messageQueue.ReceiveCompleted += receiveMsg;
-					messageQueue.BeginReceive();
-					pause.WaitOne();
-					messageQueue.ReceiveCompleted -= receiveMsg;
-				}
-
-				nmsMessage = ToNmsMessage(message);
-			}
-
-			return nmsMessage;
-		}
-
-		public IMessage Receive(TimeSpan timeout)
-		{
-			IMessage nmsMessage = null;
-
-			if(messageQueue != null)
-			{
-				Message message = messageQueue.Receive(timeout);
-				nmsMessage = ToNmsMessage(message);
-			}
-
-			return nmsMessage;
-		}
-
-		public IMessage ReceiveNoWait()
-		{
-			IMessage nmsMessage = null;
-
-			if(messageQueue != null)
-			{
-				Message message = messageQueue.Receive(zeroTimeout);
-				nmsMessage = ToNmsMessage(message);
-			}
-
-			return nmsMessage;
-		}
-
-		public void Dispose()
-		{
-			Close();
-		}
-
-		public void Close()
-		{
-			StopAsyncDelivery();
-			if(messageQueue != null)
-			{
-				messageQueue.Dispose();
-				messageQueue = null;
-			}
-		}
-
-		protected virtual void StopAsyncDelivery()
-		{
-			if(asyncDelivery.CompareAndSet(true, false))
-			{
-				if(null != asyncDeliveryThread)
-				{
-					Tracer.Info("Stopping async delivery thread.");
-					pause.Set();
-					if(!asyncDeliveryThread.Join(10000))
-					{
-						Tracer.Info("Aborting async delivery thread.");
-						asyncDeliveryThread.Abort();
-					}
-
-					asyncDeliveryThread = null;
-					Tracer.Info("Async delivery thread stopped.");
-				}
-			}
-		}
-
-		protected virtual void StartAsyncDelivery()
-		{
-			if(asyncDelivery.CompareAndSet(false, true))
-			{
-				asyncDeliveryThread = new Thread(new ThreadStart(DispatchLoop));
-				asyncDeliveryThread.Name = "Message Consumer Dispatch: " + messageQueue.QueueName;
-				asyncDeliveryThread.IsBackground = true;
-				asyncDeliveryThread.Start();
-			}
-		}
-
-		protected virtual void DispatchLoop()
-		{
-			Tracer.Info("Starting dispatcher thread consumer: " + this);
-			while(asyncDelivery.Value)
-			{
-				try
-				{
-					IMessage message = Receive();
-					if(asyncDelivery.Value && message != null)
-					{
-						try
-						{
-							listener(message);
-						}
-						catch(Exception e)
-						{
-							HandleAsyncException(e);
-						}
-					}
-				}
-				catch(ThreadAbortException ex)
-				{
-					Tracer.InfoFormat("Thread abort received in thread: {0} : {1}", this, ex.Message);
-					break;
-				}
-				catch(Exception ex)
-				{
-					Tracer.ErrorFormat("Exception while receiving message in thread: {0} : {1}", this, ex.Message);
-				}
-			}
-			Tracer.Info("Stopping dispatcher thread consumer: " + this);
-		}
-
-		protected virtual void HandleAsyncException(Exception e)
-		{
-			session.Connection.HandleException(e);
-		}
-
-		protected virtual IMessage ToNmsMessage(Message message)
-		{
-			if(message == null)
-			{
-				return null;
-			}
-			return session.MessageConverter.ToNmsMessage(message);
-		}
-	}
+    /// <summary>
+    /// An object capable of receiving messages from some destination
+    /// </summary>
+    public class MessageConsumer : IMessageConsumer
+    {
+        protected TimeSpan zeroTimeout = new TimeSpan(0);
+
+        private readonly Session session;
+        private readonly AcknowledgementMode acknowledgementMode;
+        private MessageQueue messageQueue;
+        private event MessageListener listener;
+        private int listenerCount = 0;
+        private Thread asyncDeliveryThread = null;
+        private AutoResetEvent pause = new AutoResetEvent(false);
+        private Atomic<bool> asyncDelivery = new Atomic<bool>(false);
+
+        private ConsumerTransformerDelegate consumerTransformer;
+        public ConsumerTransformerDelegate ConsumerTransformer
+        {
+            get { return this.consumerTransformer; }
+            set { this.consumerTransformer = value; }
+        }
+
+        public MessageConsumer(Session session, AcknowledgementMode acknowledgementMode, MessageQueue messageQueue)
+        {
+            this.session = session;
+            this.acknowledgementMode = acknowledgementMode;
+            this.messageQueue = messageQueue;
+            if(null != this.messageQueue)
+            {
+                this.messageQueue.MessageReadPropertyFilter.SetAll();
+            }
+        }
+
+        public event MessageListener Listener
+        {
+            add
+            {
+                listener += value;
+                listenerCount++;
+                StartAsyncDelivery();
+            }
+
+            remove
+            {
+                if(listenerCount > 0)
+                {
+                    listener -= value;
+                    listenerCount--;
+                }
+
+                if(0 == listenerCount)
+                {
+                    StopAsyncDelivery();
+                }
+            }
+        }
+
+        public IMessage Receive()
+        {
+            IMessage nmsMessage = null;
+
+            if(messageQueue != null)
+            {
+                Message message;
+
+                try
+                {
+                    message = messageQueue.Receive(zeroTimeout);
+                }
+                catch
+                {
+                    message = null;
+                }
+
+                if(null == message)
+                {
+                    ReceiveCompletedEventHandler receiveMsg =
+                            delegate(Object source, ReceiveCompletedEventArgs asyncResult) {
+                                message = messageQueue.EndReceive(asyncResult.AsyncResult);
+                                pause.Set();
+                            };
+
+                    messageQueue.ReceiveCompleted += receiveMsg;
+                    messageQueue.BeginReceive();
+                    pause.WaitOne();
+                    messageQueue.ReceiveCompleted -= receiveMsg;
+                }
+
+                nmsMessage = ToNmsMessage(message);
+            }
+
+            return nmsMessage;
+        }
+
+        public IMessage Receive(TimeSpan timeout)
+        {
+            IMessage nmsMessage = null;
+
+            if(messageQueue != null)
+            {
+                Message message = messageQueue.Receive(timeout);
+                nmsMessage = ToNmsMessage(message);
+            }
+
+            return nmsMessage;
+        }
+
+        public IMessage ReceiveNoWait()
+        {
+            IMessage nmsMessage = null;
+
+            if(messageQueue != null)
+            {
+                Message message = messageQueue.Receive(zeroTimeout);
+                nmsMessage = ToNmsMessage(message);
+            }
+
+            return nmsMessage;
+        }
+
+        public void Dispose()
+        {
+            Close();
+        }
+
+        public void Close()
+        {
+            StopAsyncDelivery();
+            if(messageQueue != null)
+            {
+                messageQueue.Dispose();
+                messageQueue = null;
+            }
+        }
+
+        protected virtual void StopAsyncDelivery()
+        {
+            if(asyncDelivery.CompareAndSet(true, false))
+            {
+                if(null != asyncDeliveryThread)
+                {
+                    Tracer.Info("Stopping async delivery thread.");
+                    pause.Set();
+                    if(!asyncDeliveryThread.Join(10000))
+                    {
+                        Tracer.Info("Aborting async delivery thread.");
+                        asyncDeliveryThread.Abort();
+                    }
+
+                    asyncDeliveryThread = null;
+                    Tracer.Info("Async delivery thread stopped.");
+                }
+            }
+        }
+
+        protected virtual void StartAsyncDelivery()
+        {
+            if(asyncDelivery.CompareAndSet(false, true))
+            {
+                asyncDeliveryThread = new Thread(new ThreadStart(DispatchLoop));
+                asyncDeliveryThread.Name = "Message Consumer Dispatch: " + messageQueue.QueueName;
+                asyncDeliveryThread.IsBackground = true;
+                asyncDeliveryThread.Start();
+            }
+        }
+
+        protected virtual void DispatchLoop()
+        {
+            Tracer.Info("Starting dispatcher thread consumer: " + this);
+            while(asyncDelivery.Value)
+            {
+                try
+                {
+                    IMessage message = Receive();
+                    if(asyncDelivery.Value && message != null)
+                    {
+                        try
+                        {
+                            listener(message);
+                        }
+                        catch(Exception e)
+                        {
+                            HandleAsyncException(e);
+                        }
+                    }
+                }
+                catch(ThreadAbortException ex)
+                {
+                    Tracer.InfoFormat("Thread abort received in thread: {0} : {1}", this, ex.Message);
+                    break;
+                }
+                catch(Exception ex)
+                {
+                    Tracer.ErrorFormat("Exception while receiving message in thread: {0} : {1}", this, ex.Message);
+                }
+            }
+            Tracer.Info("Stopping dispatcher thread consumer: " + this);
+        }
+
+        protected virtual void HandleAsyncException(Exception e)
+        {
+            session.Connection.HandleException(e);
+        }
+
+        protected virtual IMessage ToNmsMessage(Message message)
+        {
+            if(message == null)
+            {
+                return null;
+            }
+
+            IMessage converted = session.MessageConverter.ToNmsMessage(message);
+
+            if(this.ConsumerTransformer != null)
+            {
+                IMessage newMessage = ConsumerTransformer(this.session, this, message);
+                if(newMessage != null)
+                {
+                    converted = newMessage;
+                }
+            }
+
+            return converted;
+        }
+    }
 }

Modified: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/MessageProducer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/MessageProducer.cs?rev=990623&r1=990622&r2=990623&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/MessageProducer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/MessageProducer.cs Sun Aug 29 18:44:38 2010
@@ -19,258 +19,274 @@ using System.Messaging;
 
 namespace Apache.NMS.MSMQ
 {
-	/// <summary>
-	/// An object capable of sending messages to some destination
-	/// </summary>
-	public class MessageProducer : IMessageProducer
-	{
-
-		private readonly Session session;
-		private Destination destination;
-
-		//private long messageCounter;
-		private MsgDeliveryMode deliveryMode;
-		private TimeSpan timeToLive;
-		private MsgPriority priority;
-		private bool disableMessageID;
-		private bool disableMessageTimestamp;
-
-		private MessageQueue messageQueue;
-
-		public MessageProducer(Session session, Destination destination)
-		{
-			this.session = session;
-			this.destination = destination;
-			if(destination != null)
-			{
-				messageQueue = openMessageQueue(destination);
-			}
-		}
-
-		private MessageQueue openMessageQueue(Destination dest)
-		{
-			MessageQueue rc = null;
-			try
-			{
-				if(!MessageQueue.Exists(dest.Path))
-				{
-					// create the new message queue and make it transactional
-					rc = MessageQueue.Create(dest.Path, session.Transacted);
-					this.destination.Path = rc.Path;
-				}
-				else
-				{
-					rc = new MessageQueue(dest.Path);
-					this.destination.Path = rc.Path;
-					if(!rc.CanWrite)
-					{
-						throw new NMSSecurityException("Do not have write access to: " + dest);
-					}
-				}
-			}
-			catch(Exception e)
-			{
-				if(rc != null)
-				{
-					rc.Dispose();
-				}
-
-				throw new NMSException(e.Message + ": " + dest, e);
-			}
-			return rc;
-		}
-
-		public void Send(IMessage message)
-		{
-			Send(Destination, message);
-		}
-
-		public void Send(IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
-		{
-			Send(Destination, message, deliveryMode, priority, timeToLive);
-		}
-
-		public void Send(IDestination destination, IMessage message)
-		{
-			Send(destination, message, DeliveryMode, Priority, TimeToLive);
-		}
-
-		public void Send(IDestination destination, IMessage imessage, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
-		{
-			BaseMessage message = (BaseMessage) imessage;
-			MessageQueue mq = null;
-
-			try
-			{
-				// Locate the MSMQ Queue we will be sending to
-				if(messageQueue != null)
-				{
-					if(destination.Equals(this.destination))
-					{
-						mq = messageQueue;
-					}
-					else
-					{
-						throw new NMSException("This producer can only be used to send to: " + destination);
-					}
-				}
-				else
-				{
-					mq = openMessageQueue((Destination) destination);
-				}
-
-				message.NMSDeliveryMode = deliveryMode;
-				message.NMSTimeToLive = timeToLive;
-				message.NMSPriority = priority;
-				if(!DisableMessageTimestamp)
-				{
-					message.NMSTimestamp = DateTime.UtcNow;
-				}
-
-				if(!DisableMessageID)
-				{
-					// TODO: message.NMSMessageId =
-				}
-
-				// Convert the Mesasge into a MSMQ message
-				Message msg = session.MessageConverter.ToMsmqMessage(message);
-
-				if(mq.Transactional)
-				{
-					if(session.Transacted)
-					{
-						mq.Send(msg, session.MessageQueueTransaction);
-
-					}
-					else
-					{
-						// Start our own mini transaction here to send the message.
-						using(MessageQueueTransaction transaction = new MessageQueueTransaction())
-						{
-							transaction.Begin();
-							mq.Send(msg, transaction);
-							transaction.Commit();
-						}
-					}
-				}
-				else
-				{
-					if(session.Transacted)
-					{
-						// We may want to raise an exception here since app requested
-						// a transeced NMS session, but is using a non transacted message queue
-						// For now silently ignore it.
-					}
-					mq.Send(msg);
-				}
-
-			}
-			finally
-			{
-				if(mq != null && mq != messageQueue)
-				{
-					mq.Dispose();
-				}
-			}
-		}
-
-		public void Close()
-		{
-			if(messageQueue != null)
-			{
-				messageQueue.Dispose();
-				messageQueue = null;
-			}
-		}
-
-		public void Dispose()
-		{
-			Close();
-		}
-
-		public IMessage CreateMessage()
-		{
-			return session.CreateMessage();
-		}
-
-		public ITextMessage CreateTextMessage()
-		{
-			return session.CreateTextMessage();
-		}
-
-		public ITextMessage CreateTextMessage(String text)
-		{
-			return session.CreateTextMessage(text);
-		}
-
-		public IMapMessage CreateMapMessage()
-		{
-			return session.CreateMapMessage();
-		}
-
-		public IObjectMessage CreateObjectMessage(Object body)
-		{
-			return session.CreateObjectMessage(body);
-		}
-
-		public IBytesMessage CreateBytesMessage()
-		{
-			return session.CreateBytesMessage();
-		}
-
-		public IBytesMessage CreateBytesMessage(byte[] body)
-		{
-			return session.CreateBytesMessage(body);
-		}
-
-		public IStreamMessage CreateStreamMessage()
-		{
-			return session.CreateStreamMessage();
-		}
-
-		public MsgDeliveryMode DeliveryMode
-		{
-			get { return deliveryMode; }
-			set { deliveryMode = value; }
-		}
-
-		public TimeSpan TimeToLive
-		{
-			get { return timeToLive; }
-			set { timeToLive = value; }
-		}
-
-		/// <summary>
-		/// The default timeout for network requests.
-		/// </summary>
-		public TimeSpan RequestTimeout
-		{
-			get { return NMSConstants.defaultRequestTimeout; }
-			set { }
-		}
-
-		public IDestination Destination
-		{
-			get { return destination; }
-			set { destination = (Destination) value; }
-		}
-
-		public MsgPriority Priority
-		{
-			get { return priority; }
-			set { priority = value; }
-		}
-
-		public bool DisableMessageID
-		{
-			get { return disableMessageID; }
-			set { disableMessageID = value; }
-		}
-
-		public bool DisableMessageTimestamp
-		{
-			get { return disableMessageTimestamp; }
-			set { disableMessageTimestamp = value; }
-		}
-	}
+    /// <summary>
+    /// An object capable of sending messages to some destination
+    /// </summary>
+    public class MessageProducer : IMessageProducer
+    {
+
+        private readonly Session session;
+        private Destination destination;
+
+        //private long messageCounter;
+        private MsgDeliveryMode deliveryMode;
+        private TimeSpan timeToLive;
+        private MsgPriority priority;
+        private bool disableMessageID;
+        private bool disableMessageTimestamp;
+
+        private MessageQueue messageQueue;
+
+        private ProducerTransformerDelegate producerTransformer;
+        public ProducerTransformerDelegate ProducerTransformer
+        {
+            get { return this.producerTransformer; }
+            set { this.producerTransformer = value; }
+        }
+
+        public MessageProducer(Session session, Destination destination)
+        {
+            this.session = session;
+            this.destination = destination;
+            if(destination != null)
+            {
+                messageQueue = openMessageQueue(destination);
+            }
+        }
+
+        private MessageQueue openMessageQueue(Destination dest)
+        {
+            MessageQueue rc = null;
+            try
+            {
+                if(!MessageQueue.Exists(dest.Path))
+                {
+                    // create the new message queue and make it transactional
+                    rc = MessageQueue.Create(dest.Path, session.Transacted);
+                    this.destination.Path = rc.Path;
+                }
+                else
+                {
+                    rc = new MessageQueue(dest.Path);
+                    this.destination.Path = rc.Path;
+                    if(!rc.CanWrite)
+                    {
+                        throw new NMSSecurityException("Do not have write access to: " + dest);
+                    }
+                }
+            }
+            catch(Exception e)
+            {
+                if(rc != null)
+                {
+                    rc.Dispose();
+                }
+
+                throw new NMSException(e.Message + ": " + dest, e);
+            }
+            return rc;
+        }
+
+        public void Send(IMessage message)
+        {
+            Send(Destination, message);
+        }
+
+        public void Send(IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
+        {
+            Send(Destination, message, deliveryMode, priority, timeToLive);
+        }
+
+        public void Send(IDestination destination, IMessage message)
+        {
+            Send(destination, message, DeliveryMode, Priority, TimeToLive);
+        }
+
+        public void Send(IDestination destination, IMessage imessage, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
+        {
+            BaseMessage message = (BaseMessage) imessage;
+            MessageQueue mq = null;
+
+            try
+            {
+                // Locate the MSMQ Queue we will be sending to
+                if(messageQueue != null)
+                {
+                    if(destination.Equals(this.destination))
+                    {
+                        mq = messageQueue;
+                    }
+                    else
+                    {
+                        throw new NMSException("This producer can only be used to send to: " + destination);
+                    }
+                }
+                else
+                {
+                    mq = openMessageQueue((Destination) destination);
+                }
+
+                if(this.ProducerTransformer != null)
+                {
+                    IMessage transformed = this.ProducerTransformer(this.session, this, message);
+                    if(transformed != null)
+                    {
+                        message = transformed;
+                    }
+                }
+
+                message.NMSDeliveryMode = deliveryMode;
+                message.NMSTimeToLive = timeToLive;
+                message.NMSPriority = priority;
+                if(!DisableMessageTimestamp)
+                {
+                    message.NMSTimestamp = DateTime.UtcNow;
+                }
+
+                if(!DisableMessageID)
+                {
+                    // TODO: message.NMSMessageId =
+                }
+
+                // Convert the Mesasge into a MSMQ message
+                Message msg = session.MessageConverter.ToMsmqMessage(message);
+
+                if(mq.Transactional)
+                {
+                    if(session.Transacted)
+                    {
+                        mq.Send(msg, session.MessageQueueTransaction);
+
+                    }
+                    else
+                    {
+                        // Start our own mini transaction here to send the message.
+                        using(MessageQueueTransaction transaction = new MessageQueueTransaction())
+                        {
+                            transaction.Begin();
+                            mq.Send(msg, transaction);
+                            transaction.Commit();
+                        }
+                    }
+                }
+                else
+                {
+                    if(session.Transacted)
+                    {
+                        // We may want to raise an exception here since app requested
+                        // a transeced NMS session, but is using a non transacted message queue
+                        // For now silently ignore it.
+                    }
+                    mq.Send(msg);
+                }
+
+            }
+            finally
+            {
+                if(mq != null && mq != messageQueue)
+                {
+                    mq.Dispose();
+                }
+            }
+        }
+
+        public void Close()
+        {
+            if(messageQueue != null)
+            {
+                messageQueue.Dispose();
+                messageQueue = null;
+            }
+        }
+
+        public void Dispose()
+        {
+            Close();
+        }
+
+        public IMessage CreateMessage()
+        {
+            return session.CreateMessage();
+        }
+
+        public ITextMessage CreateTextMessage()
+        {
+            return session.CreateTextMessage();
+        }
+
+        public ITextMessage CreateTextMessage(String text)
+        {
+            return session.CreateTextMessage(text);
+        }
+
+        public IMapMessage CreateMapMessage()
+        {
+            return session.CreateMapMessage();
+        }
+
+        public IObjectMessage CreateObjectMessage(Object body)
+        {
+            return session.CreateObjectMessage(body);
+        }
+
+        public IBytesMessage CreateBytesMessage()
+        {
+            return session.CreateBytesMessage();
+        }
+
+        public IBytesMessage CreateBytesMessage(byte[] body)
+        {
+            return session.CreateBytesMessage(body);
+        }
+
+        public IStreamMessage CreateStreamMessage()
+        {
+            return session.CreateStreamMessage();
+        }
+
+        public MsgDeliveryMode DeliveryMode
+        {
+            get { return deliveryMode; }
+            set { deliveryMode = value; }
+        }
+
+        public TimeSpan TimeToLive
+        {
+            get { return timeToLive; }
+            set { timeToLive = value; }
+        }
+
+        /// <summary>
+        /// The default timeout for network requests.
+        /// </summary>
+        public TimeSpan RequestTimeout
+        {
+            get { return NMSConstants.defaultRequestTimeout; }
+            set { }
+        }
+
+        public IDestination Destination
+        {
+            get { return destination; }
+            set { destination = (Destination) value; }
+        }
+
+        public MsgPriority Priority
+        {
+            get { return priority; }
+            set { priority = value; }
+        }
+
+        public bool DisableMessageID
+        {
+            get { return disableMessageID; }
+            set { disableMessageID = value; }
+        }
+
+        public bool DisableMessageTimestamp
+        {
+            get { return disableMessageTimestamp; }
+            set { disableMessageTimestamp = value; }
+        }
+    }
 }

Modified: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Session.cs?rev=990623&r1=990622&r2=990623&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Session.cs Sun Aug 29 18:44:38 2010
@@ -19,229 +19,243 @@ using System.Messaging;
 
 namespace Apache.NMS.MSMQ
 {
-	/// <summary>
-	/// MSQM provider of ISession
-	/// </summary>
-	public class Session : ISession
-	{
-		private Connection connection;
-		private AcknowledgementMode acknowledgementMode;
-		private MessageQueueTransaction messageQueueTransaction;
-		private IMessageConverter messageConverter;
-
-		public Session(Connection connection, AcknowledgementMode acknowledgementMode)
-		{
-			this.connection = connection;
-			this.acknowledgementMode = acknowledgementMode;
-			MessageConverter = connection.MessageConverter;
-			if(this.acknowledgementMode == AcknowledgementMode.Transactional)
-			{
-				MessageQueueTransaction = new MessageQueueTransaction();
-			}
-		}
-
-		public void Dispose()
-		{
-			if(MessageQueueTransaction != null)
-			{
-				MessageQueueTransaction.Dispose();
-			}
-		}
-
-		public IMessageProducer CreateProducer()
-		{
-			return CreateProducer(null);
-		}
-
-		public IMessageProducer CreateProducer(IDestination destination)
-		{
-			return new MessageProducer(this, (Destination) destination);
-		}
-
-		public IMessageConsumer CreateConsumer(IDestination destination)
-		{
-			return CreateConsumer(destination, null);
-		}
-
-		public IMessageConsumer CreateConsumer(IDestination destination, string selector)
-		{
-			return CreateConsumer(destination, selector, false);
-		}
-
-		public IMessageConsumer CreateConsumer(IDestination destination, string selector, bool noLocal)
-		{
-			if(selector != null)
-			{
-				throw new NotSupportedException("Selectors are not supported by MSMQ");
-			}
-			MessageQueue queue = MessageConverter.ToMsmqDestination(destination);
-			return new MessageConsumer(this, acknowledgementMode, queue);
-		}
-
-		public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal)
-		{
-			throw new NotSupportedException("Durable Topic subscribers are not supported by MSMQ");
-		}
-
-		public void DeleteDurableConsumer(string name)
-		{
-			throw new NotSupportedException("Durable Topic subscribers are not supported by MSMQ");
-		}
-
-		public IQueueBrowser CreateBrowser(IQueue queue)
-		{
-			throw new NotImplementedException();
-		}
-
-		public IQueueBrowser CreateBrowser(IQueue queue, string selector)
-		{
-			throw new NotImplementedException();
-		}
-
-		public IQueue GetQueue(string name)
-		{
-			return new Queue(name);
-		}
-
-		public ITopic GetTopic(string name)
-		{
-			throw new NotSupportedException("Topics are not supported by MSMQ");
-		}
-
-		public ITemporaryQueue CreateTemporaryQueue()
-		{
-			throw new NotSupportedException("Tempoary Queues are not supported by MSMQ");
-		}
-
-		public ITemporaryTopic CreateTemporaryTopic()
-		{
-			throw new NotSupportedException("Tempoary Topics are not supported by MSMQ");
-		}
-
-		/// <summary>
-		/// Delete a destination (Queue, Topic, Temp Queue, Temp Topic).
-		/// </summary>
-		public void DeleteDestination(IDestination destination)
-		{
-			// TODO: Implement if possible.  If not possible, then change exception to NotSupportedException().
-			throw new NotImplementedException();
-		}
-
-		public IMessage CreateMessage()
-		{
-			BaseMessage answer = new BaseMessage();
-			return answer;
-		}
-
-
-		public ITextMessage CreateTextMessage()
-		{
-			TextMessage answer = new TextMessage();
-			return answer;
-		}
-
-		public ITextMessage CreateTextMessage(string text)
-		{
-			TextMessage answer = new TextMessage(text);
-			return answer;
-		}
-
-		public IMapMessage CreateMapMessage()
-		{
-			return new MapMessage();
-		}
-
-		public IBytesMessage CreateBytesMessage()
-		{
-			return new BytesMessage();
-		}
-
-		public IBytesMessage CreateBytesMessage(byte[] body)
-		{
-			BytesMessage answer = new BytesMessage();
-			answer.Content = body;
-			return answer;
-		}
-
-		public IStreamMessage CreateStreamMessage()
-		{
-			return new StreamMessage();
-		}
-
-		public IObjectMessage CreateObjectMessage(Object body)
-		{
-			ObjectMessage answer = new ObjectMessage();
-			answer.Body = body;
-			return answer;
-		}
-
-		public void Commit()
-		{
-			if(!Transacted)
-			{
-				throw new InvalidOperationException("You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: " + acknowledgementMode);
-			}
-			messageQueueTransaction.Commit();
-		}
-
-		public void Rollback()
-		{
-			if(!Transacted)
-			{
-				throw new InvalidOperationException("You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: " + acknowledgementMode);
-			}
-			messageQueueTransaction.Abort();
-		}
-
-		// Properties
-		public Connection Connection
-		{
-			get { return connection; }
-		}
-
-		/// <summary>
-		/// The default timeout for network requests.
-		/// </summary>
-		public TimeSpan RequestTimeout
-		{
-			get { return NMSConstants.defaultRequestTimeout; }
-			set { }
-		}
-
-		public bool Transacted
-		{
-			get { return acknowledgementMode == AcknowledgementMode.Transactional; }
-		}
-
-		public AcknowledgementMode AcknowledgementMode
-		{
-			get { throw new NotImplementedException(); }
-		}
-
-		public MessageQueueTransaction MessageQueueTransaction
-		{
-			get
-			{
-				if(null != messageQueueTransaction
-					&& messageQueueTransaction.Status != MessageQueueTransactionStatus.Pending)
-				{
-					messageQueueTransaction.Begin();
-				}
-
-				return messageQueueTransaction;
-			}
-			set { messageQueueTransaction = value; }
-		}
-
-		public IMessageConverter MessageConverter
-		{
-			get { return messageConverter; }
-			set { messageConverter = value; }
-		}
-
-		public void Close()
-		{
-			Dispose();
-		}
-	}
+    /// <summary>
+    /// MSQM provider of ISession
+    /// </summary>
+    public class Session : ISession
+    {
+        private Connection connection;
+        private AcknowledgementMode acknowledgementMode;
+        private MessageQueueTransaction messageQueueTransaction;
+        private IMessageConverter messageConverter;
+
+        public Session(Connection connection, AcknowledgementMode acknowledgementMode)
+        {
+            this.connection = connection;
+            this.acknowledgementMode = acknowledgementMode;
+            MessageConverter = connection.MessageConverter;
+            if(this.acknowledgementMode == AcknowledgementMode.Transactional)
+            {
+                MessageQueueTransaction = new MessageQueueTransaction();
+            }
+        }
+
+        public void Dispose()
+        {
+            if(MessageQueueTransaction != null)
+            {
+                MessageQueueTransaction.Dispose();
+            }
+        }
+
+        public IMessageProducer CreateProducer()
+        {
+            return CreateProducer(null);
+        }
+
+        public IMessageProducer CreateProducer(IDestination destination)
+        {
+            return new MessageProducer(this, (Destination) destination);
+        }
+
+        public IMessageConsumer CreateConsumer(IDestination destination)
+        {
+            return CreateConsumer(destination, null);
+        }
+
+        public IMessageConsumer CreateConsumer(IDestination destination, string selector)
+        {
+            return CreateConsumer(destination, selector, false);
+        }
+
+        public IMessageConsumer CreateConsumer(IDestination destination, string selector, bool noLocal)
+        {
+            if(selector != null)
+            {
+                throw new NotSupportedException("Selectors are not supported by MSMQ");
+            }
+            MessageQueue queue = MessageConverter.ToMsmqDestination(destination);
+            return new MessageConsumer(this, acknowledgementMode, queue);
+        }
+
+        public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal)
+        {
+            throw new NotSupportedException("Durable Topic subscribers are not supported by MSMQ");
+        }
+
+        public void DeleteDurableConsumer(string name)
+        {
+            throw new NotSupportedException("Durable Topic subscribers are not supported by MSMQ");
+        }
+
+        public IQueueBrowser CreateBrowser(IQueue queue)
+        {
+            throw new NotImplementedException();
+        }
+
+        public IQueueBrowser CreateBrowser(IQueue queue, string selector)
+        {
+            throw new NotImplementedException();
+        }
+
+        public IQueue GetQueue(string name)
+        {
+            return new Queue(name);
+        }
+
+        public ITopic GetTopic(string name)
+        {
+            throw new NotSupportedException("Topics are not supported by MSMQ");
+        }
+
+        public ITemporaryQueue CreateTemporaryQueue()
+        {
+            throw new NotSupportedException("Tempoary Queues are not supported by MSMQ");
+        }
+
+        public ITemporaryTopic CreateTemporaryTopic()
+        {
+            throw new NotSupportedException("Tempoary Topics are not supported by MSMQ");
+        }
+
+        /// <summary>
+        /// Delete a destination (Queue, Topic, Temp Queue, Temp Topic).
+        /// </summary>
+        public void DeleteDestination(IDestination destination)
+        {
+            // TODO: Implement if possible.  If not possible, then change exception to NotSupportedException().
+            throw new NotImplementedException();
+        }
+
+        public IMessage CreateMessage()
+        {
+            BaseMessage answer = new BaseMessage();
+            return answer;
+        }
+
+
+        public ITextMessage CreateTextMessage()
+        {
+            TextMessage answer = new TextMessage();
+            return answer;
+        }
+
+        public ITextMessage CreateTextMessage(string text)
+        {
+            TextMessage answer = new TextMessage(text);
+            return answer;
+        }
+
+        public IMapMessage CreateMapMessage()
+        {
+            return new MapMessage();
+        }
+
+        public IBytesMessage CreateBytesMessage()
+        {
+            return new BytesMessage();
+        }
+
+        public IBytesMessage CreateBytesMessage(byte[] body)
+        {
+            BytesMessage answer = new BytesMessage();
+            answer.Content = body;
+            return answer;
+        }
+
+        public IStreamMessage CreateStreamMessage()
+        {
+            return new StreamMessage();
+        }
+
+        public IObjectMessage CreateObjectMessage(Object body)
+        {
+            ObjectMessage answer = new ObjectMessage();
+            answer.Body = body;
+            return answer;
+        }
+
+        public void Commit()
+        {
+            if(!Transacted)
+            {
+                throw new InvalidOperationException("You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: " + acknowledgementMode);
+            }
+            messageQueueTransaction.Commit();
+        }
+
+        public void Rollback()
+        {
+            if(!Transacted)
+            {
+                throw new InvalidOperationException("You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: " + acknowledgementMode);
+            }
+            messageQueueTransaction.Abort();
+        }
+
+        // Properties
+        public Connection Connection
+        {
+            get { return connection; }
+        }
+
+        /// <summary>
+        /// The default timeout for network requests.
+        /// </summary>
+        public TimeSpan RequestTimeout
+        {
+            get { return NMSConstants.defaultRequestTimeout; }
+            set { }
+        }
+
+        public bool Transacted
+        {
+            get { return acknowledgementMode == AcknowledgementMode.Transactional; }
+        }
+
+        public AcknowledgementMode AcknowledgementMode
+        {
+            get { throw new NotImplementedException(); }
+        }
+
+        public MessageQueueTransaction MessageQueueTransaction
+        {
+            get
+            {
+                if(null != messageQueueTransaction
+                    && messageQueueTransaction.Status != MessageQueueTransactionStatus.Pending)
+                {
+                    messageQueueTransaction.Begin();
+                }
+
+                return messageQueueTransaction;
+            }
+            set { messageQueueTransaction = value; }
+        }
+
+        public IMessageConverter MessageConverter
+        {
+            get { return messageConverter; }
+            set { messageConverter = value; }
+        }
+
+        private ConsumerTransformerDelegate consumerTransformer;
+        public ConsumerTransformerDelegate ConsumerTransformer
+        {
+            get { return this.consumerTransformer; }
+            set { this.consumerTransformer = value; }
+        }
+
+        private ProducerTransformerDelegate producerTransformer;
+        public ProducerTransformerDelegate ProducerTransformer
+        {
+            get { return this.producerTransformer; }
+            set { this.producerTransformer = value; }
+        }
+
+        public void Close()
+        {
+            Dispose();
+        }
+    }
 }

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs?rev=990623&r1=990622&r2=990623&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs Sun Aug 29 18:44:38 2010
@@ -280,6 +280,20 @@ namespace Apache.NMS.Stomp
             get { return this.messageTransformation; }
         }
 
+        private ConsumerTransformerDelegate consumerTransformer;
+        public ConsumerTransformerDelegate ConsumerTransformer
+        {
+            get { return this.consumerTransformer; }
+            set { this.consumerTransformer = value; }
+        }
+
+        private ProducerTransformerDelegate producerTransformer;
+        public ProducerTransformerDelegate ProducerTransformer
+        {
+            get { return this.producerTransformer; }
+            set { this.producerTransformer = value; }
+        }
+
         #endregion
 
         /// <summary>
@@ -350,6 +364,9 @@ namespace Apache.NMS.Stomp
             options = URISupport.GetProperties(options, "session.");
             URISupport.SetProperties(session, options);
 
+            session.ConsumerTransformer = this.ConsumerTransformer;
+            session.ProducerTransformer = this.ProducerTransformer;
+            
             if(IsStarted)
             {
                 session.Start();

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/ConnectionFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/ConnectionFactory.cs?rev=990623&r1=990622&r2=990623&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/ConnectionFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/ConnectionFactory.cs Sun Aug 29 18:44:38 2010
@@ -309,6 +309,20 @@ namespace Apache.NMS.Stomp
             }
         }
 
+        private ConsumerTransformerDelegate consumerTransformer;
+        public ConsumerTransformerDelegate ConsumerTransformer
+        {
+            get { return this.consumerTransformer; }
+            set { this.consumerTransformer = value; }
+        }
+
+        private ProducerTransformerDelegate producerTransformer;
+        public ProducerTransformerDelegate ProducerTransformer
+        {
+            get { return this.producerTransformer; }
+            set { this.producerTransformer = value; }
+        }
+
         #endregion
 
         protected virtual void ConfigureConnection(Connection connection)
@@ -322,6 +336,8 @@ namespace Apache.NMS.Stomp
             connection.RequestTimeout = this.requestTimeout;
             connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as IRedeliveryPolicy;
             connection.PrefetchPolicy = this.prefetchPolicy.Clone() as PrefetchPolicy;
+            connection.ConsumerTransformer = this.consumerTransformer;
+            connection.ProducerTransformer = this.producerTransformer;
         }
 
         protected static void ExceptionHandler(Exception ex)

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs?rev=990623&r1=990622&r2=990623&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs Sun Aug 29 18:44:38 2010
@@ -36,6 +36,7 @@ namespace Apache.NMS.Stomp
     /// </summary>
     public class MessageConsumer : IMessageConsumer, IDispatcher
     {
+        private readonly MessageTransformation messageTransformation;
         private readonly MessageDispatchChannel unconsumedMessages = new MessageDispatchChannel();
         private readonly LinkedList<MessageDispatch> dispatchedMessages = new LinkedList<MessageDispatch>();
         private readonly ConsumerInfo info;
@@ -65,6 +66,7 @@ namespace Apache.NMS.Stomp
             this.session = session;
             this.info = info;
             this.redeliveryPolicy = this.session.Connection.RedeliveryPolicy;
+            this.messageTransformation = this.session.Connection.MessageTransformation;
         }
 
         ~MessageConsumer()
@@ -90,6 +92,13 @@ namespace Apache.NMS.Stomp
             set { this.redeliveryPolicy = value; }
         }
 
+        private ConsumerTransformerDelegate consumerTransformer;
+        public ConsumerTransformerDelegate ConsumerTransformer
+        {
+            get { return this.consumerTransformer; }
+            set { this.consumerTransformer = value; }
+        }
+
         #endregion
 
         #region IMessageConsumer Members
@@ -838,6 +847,15 @@ namespace Apache.NMS.Stomp
         {
             Message message = dispatch.Message.Clone() as Message;
 
+            if(this.ConsumerTransformer != null)
+            {
+                IMessage transformed = this.consumerTransformer(this.session, this, message);
+                if(transformed != null)
+                {
+                    message = this.messageTransformation.TransformMessage<Message>(transformed);
+                }
+            }
+
             message.Connection = this.session.Connection;
 
             if(this.session.IsClientAcknowledge)

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageProducer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageProducer.cs?rev=990623&r1=990622&r2=990623&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageProducer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageProducer.cs Sun Aug 29 18:44:38 2010
@@ -168,6 +168,15 @@ namespace Apache.NMS.Stomp
                 throw new NotSupportedException("This producer can only send messages to: " + this.info.Destination.PhysicalName);
             }
 
+            if(this.producerTransformer != null)
+            {
+                IMessage transformed = this.producerTransformer(this.session, this, message);
+                if(transformed != null)
+                {
+                    message = transformed;
+                }
+            }
+
             Message stompMessage = this.messageTransformation.TransformMessage<Message>(message);
 
             stompMessage.ProducerId = info.ProducerId;
@@ -242,6 +251,15 @@ namespace Apache.NMS.Stomp
             set { this.disableMessageTimestamp = value; }
         }
 
+        private ProducerTransformerDelegate producerTransformer;
+        public ProducerTransformerDelegate ProducerTransformer
+        {
+            get { return this.producerTransformer; }
+            set { this.producerTransformer = value; }
+        }
+
+        #region Message Creation Factory Methods.
+
         public IMessage CreateMessage()
         {
             return session.CreateMessage();
@@ -282,5 +300,7 @@ namespace Apache.NMS.Stomp
             return session.CreateStreamMessage();
         }
 
+        #endregion
+
     }
 }

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs?rev=990623&r1=990622&r2=990623&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs Sun Aug 29 18:44:38 2010
@@ -207,6 +207,20 @@ namespace Apache.NMS.Stomp
             get { return Interlocked.Increment(ref this.nextDeliveryId); }
         }
 
+        private ConsumerTransformerDelegate consumerTransformer;
+        public ConsumerTransformerDelegate ConsumerTransformer
+        {
+            get { return this.consumerTransformer; }
+            set { this.consumerTransformer = value; }
+        }
+
+        private ProducerTransformerDelegate producerTransformer;
+        public ProducerTransformerDelegate ProducerTransformer
+        {
+            get { return this.producerTransformer; }
+            set { this.producerTransformer = value; }
+        }
+
         #endregion
 
         #region ISession Members
@@ -343,6 +357,7 @@ namespace Apache.NMS.Stomp
             try
             {
                 producer = new MessageProducer(this, command);
+                producer.ProducerTransformer = this.ProducerTransformer;
                 producers[producerId] = producer;
             }
             catch(Exception)
@@ -386,15 +401,17 @@ namespace Apache.NMS.Stomp
             try
             {
                 consumer = new MessageConsumer(this, command);
-                // lets register the consumer first in case we start dispatching messages immediately
+                consumer.ConsumerTransformer = this.ConsumerTransformer;
                 consumers[consumerId] = consumer;
-                this.Connection.SyncRequest(command);
 
                 if(this.Started)
                 {
                     consumer.Start();
                 }
 
+                // lets register the consumer first in case we start dispatching messages immediately
+                this.Connection.SyncRequest(command);
+
                 return consumer;
             }
             catch(Exception)
@@ -428,7 +445,7 @@ namespace Apache.NMS.Stomp
             try
             {
                 consumer = new MessageConsumer(this, command);
-                // lets register the consumer first in case we start dispatching messages immediately
+                consumer.ConsumerTransformer = this.ConsumerTransformer;
                 consumers[consumerId] = consumer;
 
                 if(this.Started)

Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/MessageTransformerTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/MessageTransformerTest.cs?rev=990623&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/MessageTransformerTest.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/MessageTransformerTest.cs Sun Aug 29 18:44:38 2010
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+using Apache.NMS.Util;
+using Apache.NMS.Test;
+
+using NUnit.Framework;
+
+namespace Apache.NMS.Stomp.Test
+{
+	[TestFixture]
+	public class MessageTransformerTest : NMSTestSupport
+	{
+		private string propertyName = "ADDITIONAL-PROPERTY";
+		private string propertyValue = "ADDITIONAL-PROPERTY-VALUE";
+				
+		[Test]
+		public void TestProducerTransformer(
+			[Values(MsgDeliveryMode.Persistent, MsgDeliveryMode.NonPersistent)]
+			MsgDeliveryMode deliveryMode)
+		{
+			using(IConnection connection = CreateConnection())
+			{
+				connection.Start();
+				using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+				{
+					IDestination destination = session.CreateTemporaryTopic();
+					using(IMessageConsumer consumer = session.CreateConsumer(destination))
+					using(IMessageProducer producer = session.CreateProducer(destination))
+					{
+						producer.DeliveryMode = deliveryMode;
+						producer.ProducerTransformer = DoProducerTransform;
+
+                        IMessage message = session.CreateMessage();
+
+                        message.Properties["Test"] = "Value";
+
+                        producer.Send(message);
+
+                        message = consumer.Receive(TimeSpan.FromMilliseconds(5000));
+
+                        Assert.IsNotNull(message);
+                        Assert.IsTrue(message.Properties.Count == 2);
+
+                        Assert.AreEqual("Value", message.Properties["Test"]);
+                        Assert.AreEqual(propertyValue, message.Properties[propertyName]);
+					}
+				}
+			}
+		}
+		
+		[Test]
+		public void TestConsumerTransformer(
+			[Values(MsgDeliveryMode.Persistent, MsgDeliveryMode.NonPersistent)]
+			MsgDeliveryMode deliveryMode)
+		{
+			using(IConnection connection = CreateConnection())
+			{
+				connection.Start();
+				using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+				{
+					IDestination destination = session.CreateTemporaryTopic();
+					using(IMessageConsumer consumer = session.CreateConsumer(destination))
+					using(IMessageProducer producer = session.CreateProducer(destination))
+					{
+						producer.DeliveryMode = deliveryMode;
+
+						consumer.ConsumerTransformer = DoConsumerTransform;
+
+                        IMessage message = session.CreateMessage();
+
+                        message.Properties["Test"] = "Value";
+
+                        producer.Send(message);
+
+                        message = consumer.Receive(TimeSpan.FromMilliseconds(5000));
+
+                        Assert.IsNotNull(message);
+                        Assert.IsTrue(message.Properties.Count == 2, "Property Count should be 2");
+
+                        Assert.AreEqual("Value", message.Properties["Test"], "Propert 'Value' was incorrect");
+                        Assert.AreEqual(propertyValue, message.Properties[propertyName], "Property not inserted");
+                    }
+				}
+			}
+		}
+		
+		private IMessage DoProducerTransform(ISession session, IMessageProducer producer, IMessage message)
+		{
+			message.Properties[propertyName] = propertyValue;
+			
+			return message;
+		}
+
+		private IMessage DoConsumerTransform(ISession session, IMessageConsumer consumer, IMessage message)
+		{
+            IMessage newMessage = session.CreateMessage();
+
+            MessageTransformation.CopyNMSMessageProperties(message, newMessage);
+
+			newMessage.Properties[propertyName] = propertyValue;
+
+			return newMessage;
+		}
+	}
+}
+

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/MessageTransformerTest.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj?rev=990623&r1=990622&r2=990623&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj Sun Aug 29 18:44:38 2010
@@ -57,14 +57,14 @@
   <ItemGroup>
     <Reference Include="System" />
     <Reference Include="System.Xml" />
-    <Reference Include="Apache.NMS.Test, Version=1.4.0.2025, Culture=neutral, PublicKeyToken=82756feee3957618">
-      <SpecificVersion>False</SpecificVersion>
-      <HintPath>lib\Apache.NMS\mono-2.0\Apache.NMS.Test.dll</HintPath>
-    </Reference>
     <Reference Include="nunit.framework, Version=2.5.5.10112, Culture=neutral, PublicKeyToken=96d09a1eb7f44a77">
       <SpecificVersion>False</SpecificVersion>
       <HintPath>lib\NUnit\mono-2.0\nunit.framework.dll</HintPath>
     </Reference>
+    <Reference Include="Apache.NMS.Test, Version=1.4.0.2064, Culture=neutral, PublicKeyToken=82756feee3957618">
+      <SpecificVersion>False</SpecificVersion>
+      <HintPath>lib\Apache.NMS\mono-2.0\Apache.NMS.Test.dll</HintPath>
+    </Reference>
   </ItemGroup>
   <ItemGroup>
     <BootstrapperPackage Include="Microsoft.Net.Framework.2.0">
@@ -130,5 +130,6 @@
     <Compile Include="src\test\csharp\MapMessageTest.cs" />
     <Compile Include="src\test\csharp\Commands\ConsumerIdTest.cs" />
     <Compile Include="src\test\csharp\Commands\ProducerIdTest.cs" />
+    <Compile Include="src\test\csharp\MessageTransformerTest.cs" />
   </ItemGroup>
 </Project>
\ No newline at end of file



Mime
View raw message