activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jgo...@apache.org
Subject svn commit: r1751831 [1/4] - in /activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk: ./ src/main/csharp/ src/main/csharp/Readers/ src/main/csharp/Selector/
Date Thu, 07 Jul 2016 20:47:10 GMT
Author: jgomes
Date: Thu Jul  7 20:47:10 2016
New Revision: 1751831

URL: http://svn.apache.org/viewvc?rev=1751831&view=rev
Log:
Apply patch for AMQNET-554. Suport for message properties, and selectors. Thanks Stephane Ramet!

Added:
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/IMessageConverterEx.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/AbstractMessageReader.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/ByCorrelationIdMessageReader.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/ByIdMessageReader.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/ByLookupIdMessageReader.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/BySelectorMessageReader.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/IMessageReader.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/MessageReaderUtil.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/NonFilteringMessageReader.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/ANDExpression.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/AlignedNumericValues.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/ArithmeticExpression.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/BinaryExpression.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/BooleanCastExpression.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/BooleanConstantExpression.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/BooleanUnaryExpression.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/ComparisonExpression.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/ConstantExpression.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/DivideExpression.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/EqualExpression.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/GreaterExpression.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/GreaterOrEqualExpression.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/IBooleanExpression.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/IExpression.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/InExpression.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/IsNullExpression.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/LesserExpression.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/LesserOrEqualExpression.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/LikeExpression.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/LogicExpression.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/MessageEvaluationContext.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/MinusExpression.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/ModExpression.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/MultiplyExpression.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/NOTExpression.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/NegateExpression.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/ORExpression.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/ParseException.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/PlusExpression.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/PropertyExpression.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/SelectorParser.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/SelectorParser.csc
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/SelectorParserConstants.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/SelectorParserTokenManager.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/SimpleCharStream.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/Token.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/TokenMgrError.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Selector/UnaryExpression.cs
Modified:
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/BaseMessage.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/DefaultMessageConverter.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/IMessageConverter.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/QueueBrowser.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Session.cs
    activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/vs2008-msmq.csproj

Modified: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/BaseMessage.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/BaseMessage.cs?rev=1751831&r1=1751830&r2=1751831&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/BaseMessage.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/BaseMessage.cs Thu Jul  7 20:47:10 2016
@@ -1,4 +1,4 @@
-/*
+ /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -24,28 +24,9 @@ namespace Apache.NMS.MSMQ
 
 	public class BaseMessage : IMessage
 	{
-		private PrimitiveMap propertiesMap = new PrimitiveMap();
-		private IDestination destination;
-		private string correlationId;
-		private TimeSpan timeToLive;
-		private string messageId;
-		private MsgDeliveryMode deliveryMode;
-		private MsgPriority priority;
-		private Destination replyTo;
-		private byte[] content;
-		private string type;
-		private event AcknowledgeHandler Acknowledger;
-		private DateTime timestamp = new DateTime();
-		private bool readOnlyMsgBody = false;
-
-		public bool ReadOnlyBody
-		{
-			get { return readOnlyMsgBody; }
-			set { readOnlyMsgBody = value; }
-		}
-
-		// IMessage interface
+		#region Acknowledgement
 
+		private event AcknowledgeHandler Acknowledger;
 		public void Acknowledge()
 		{
 			if(null != Acknowledger)
@@ -54,6 +35,27 @@ namespace Apache.NMS.MSMQ
 			}
 		}
 
+		#endregion
+
+		#region Message body
+
+		private byte[] content;
+		public byte[] Content
+		{
+			get { return content; }
+			set { this.content = value; }
+		}
+
+		private bool readOnlyMsgBody = false;
+		/// <summary>
+		/// Whether the message body is read-only.
+		/// </summary>
+		public bool ReadOnlyBody
+		{
+			get { return readOnlyMsgBody; }
+			set { readOnlyMsgBody = value; }
+		}
+
 		/// <summary>
 		/// Clears out the message body. Clearing a message's body does not clear its header
 		/// values or property entries.
@@ -67,26 +69,82 @@ namespace Apache.NMS.MSMQ
 			this.readOnlyMsgBody = false;
 		}
 
+		#endregion
+
+		#region Message properties
+
+		private PrimitiveMap propertiesMap = new PrimitiveMap();
+		private MessagePropertyIntercepter propertyHelper;
+		/// <summary>
+		/// Provides access to the message properties (headers)
+		/// </summary>
+		public Apache.NMS.IPrimitiveMap Properties
+		{
+			get
+			{
+                if(propertyHelper == null)
+                {
+				    propertyHelper = new Apache.NMS.Util.MessagePropertyIntercepter(
+					    this, propertiesMap, this.ReadOnlyProperties);
+				}
+
+                return propertyHelper;
+			}
+		}
+
+		private bool readOnlyMsgProperties = false;
+		/// <summary>
+		/// Whether the message properties is read-only.
+		/// </summary>
+		public virtual bool ReadOnlyProperties
+		{
+			get { return this.readOnlyMsgProperties; }
+
+			set
+			{
+				if(this.propertyHelper != null)
+				{
+					this.propertyHelper.ReadOnly = value;
+				}
+				this.readOnlyMsgProperties = value;
+			}
+		}
+
 		/// <summary>
 		/// Clears a message's properties.
-		///
 		/// The message's header fields and body are not cleared.
 		/// </summary>
-		public virtual void ClearProperties()
+		public void ClearProperties()
 		{
-			propertiesMap.Clear();
+            this.ReadOnlyProperties = false;
+            this.propertiesMap.Clear();
 		}
 
-		// Properties
+		public object GetObjectProperty(string name)
+		{
+			return Properties[name];
+		}
 
-		public IPrimitiveMap Properties
+		public void SetObjectProperty(string name, object value)
 		{
-			get { return propertiesMap; }
+            Properties[name] = value;
 		}
 
+		#endregion
 
-		// NMS headers
+		#region Message header fields
 
+		private string messageId;
+		/// <summary>
+		/// The message ID which is set by the provider
+		/// </summary>
+		public string NMSMessageId
+		{
+			get { return messageId; }
+			set { messageId = value; }
+		}
+
+		private string correlationId;
 		/// <summary>
 		/// The correlation ID used to correlate messages with conversations or long running business processes
 		/// </summary>
@@ -96,6 +154,7 @@ namespace Apache.NMS.MSMQ
 			set { correlationId = value; }
 		}
 
+		private IDestination destination;
 		/// <summary>
 		/// The destination of the message
 		/// </summary>
@@ -105,6 +164,7 @@ namespace Apache.NMS.MSMQ
 			set { destination = value; }
 		}
 
+		private TimeSpan timeToLive;
 		/// <summary>
 		/// The time in milliseconds that this message should expire in
 		/// </summary>
@@ -114,15 +174,7 @@ namespace Apache.NMS.MSMQ
 			set { timeToLive = value; }
 		}
 
-		/// <summary>
-		/// The message ID which is set by the provider
-		/// </summary>
-		public string NMSMessageId
-		{
-			get { return messageId; }
-			set { messageId = value; }
-		}
-
+		private MsgDeliveryMode deliveryMode;
 		/// <summary>
 		/// Whether or not this message is persistent
 		/// </summary>
@@ -132,6 +184,7 @@ namespace Apache.NMS.MSMQ
 			set { deliveryMode = value; }
 		}
 
+		private MsgPriority priority;
 		/// <summary>
 		/// The Priority on this message
 		/// </summary>
@@ -150,7 +203,7 @@ namespace Apache.NMS.MSMQ
             set { }
 		}
 
-
+		private Destination replyTo;
 		/// <summary>
 		/// The destination that the consumer of this message should send replies to
 		/// </summary>
@@ -160,7 +213,7 @@ namespace Apache.NMS.MSMQ
 			set { replyTo = (Destination) value; }
 		}
 
-
+		private DateTime timestamp = new DateTime();
 		/// <summary>
 		/// The timestamp the broker added to the message
 		/// </summary>
@@ -170,12 +223,7 @@ namespace Apache.NMS.MSMQ
 			set { timestamp = value; }
 		}
 
-		public byte[] Content
-		{
-			get { return content; }
-			set { this.content = value; }
-		}
-
+		private string type;
 		/// <summary>
 		/// The type name of this message
 		/// </summary>
@@ -185,15 +233,9 @@ namespace Apache.NMS.MSMQ
 			set { type = value; }
 		}
 
+        #endregion
 
-		public object GetObjectProperty(string name)
-		{
-			return null;
-		}
-
-		public void SetObjectProperty(string name, object value)
-		{
-		}
+        #region Check access mode
 
 		protected void FailIfReadOnlyBody()
 		{
@@ -205,11 +247,13 @@ namespace Apache.NMS.MSMQ
 
 		protected void FailIfWriteOnlyBody()
 		{
-			if( ReadOnlyBody == false )
+			if(ReadOnlyBody == false)
 			{
 				throw new MessageNotReadableException("Message is in Write-Only mode.");
 			}
 		}
+
+        #endregion
 	}
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/DefaultMessageConverter.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/DefaultMessageConverter.cs?rev=1751831&r1=1751830&r2=1751831&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/DefaultMessageConverter.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/DefaultMessageConverter.cs Thu Jul  7 20:47:10 2016
@@ -32,12 +32,61 @@ namespace Apache.NMS.MSMQ
 		StreamMessage
 	}
 
-	public class DefaultMessageConverter : IMessageConverter
+    /// <summary>
+    /// This class provides default rules for converting MSMQ to and from
+    /// NMS messages, when the peer system expects or produces compatible
+    /// mappings, typically when the peer system is also implemented on
+    /// Apache.NMS.
+    /// Default mappings are as follows :
+    /// <ul>
+    /// <li>
+    ///   the MSMQ Message.AppSetting field is used for specifying the NMS
+    ///   message type, as specified by the <c>NMSMessageType</c> enumeration.
+    /// </li>
+    /// <li>
+    ///   the MSMQ Message.Extension field is populated with a map
+    ///   (a marshalled <c>PrimitiveMap</c>) of message properties.
+    /// </li>
+    /// <li>
+    ///   in earlier versions of Apache.NMS.MSMQ, the MSMQ Message.Label
+    ///   field was populated with the value of the NMSType field. Setting
+    ///   <c>SetLabelAsNMSType</c> to true (the default value) applies that
+    ///   same rule, which makes it compatible with existing NMS peers. If
+    ///   set to false, the Message.Label field is populated with the value
+    ///   of a "Label" property, if it exists, thus making it readable by
+    ///   standard management or monitoring tools. The NMSType value is then
+    ///   transmitted as a field in the Message.Extension map.
+    /// </li>
+    /// </ul>
+    /// Please note that in earlier versions of Apache.NMS, only one property
+    /// was set in the Message.Extension field : the NMSCorrelationID.
+    /// The native Message.CorrelationId field is not settable, except for
+    /// reply messages explicitely created as such through the MSMQ API.
+    /// Transmission of the correlation id. through a mapped property called
+    /// NMSCorrelationID is therefore maintained.
+    /// When exchanging messages with a non compatible peer, a specific
+    /// message converter must be provided, which should at least be able to
+    /// map message types and define the encoding used for text messages.
+    /// </summary>
+	public class DefaultMessageConverter : IMessageConverterEx
 	{
+        private bool setLabelAsNMSType = true;
+        public bool SetLabelAsNMSType
+        {
+            get { return setLabelAsNMSType; }
+            set { setLabelAsNMSType = value; }
+        }
+
+        #region Messages
+        /// <summary>
+        /// Converts the specified NMS message to an equivalent MSMQ message.
+        /// </summary>
+        /// <param name="message">NMS message to be converted.</param>
+        /// <result>Converted MSMQ message.</result>
 		public virtual Message ToMsmqMessage(IMessage message)
 		{
 			Message msmqMessage = new Message();
-			PrimitiveMap metaData = new PrimitiveMap();
+			PrimitiveMap propertyData = new PrimitiveMap();
 
 			ConvertMessageBodyToMSMQ(message, msmqMessage);
 
@@ -48,32 +97,72 @@ namespace Apache.NMS.MSMQ
 
 			if(message.NMSCorrelationID != null)
 			{
-				metaData.SetString("NMSCorrelationID", message.NMSCorrelationID);
+				propertyData.SetString("NMSCorrelationID", message.NMSCorrelationID);
 			}
 
 			msmqMessage.Recoverable = (message.NMSDeliveryMode == MsgDeliveryMode.Persistent);
-			msmqMessage.Priority = ToMessagePriority(message.NMSPriority);
+			msmqMessage.Priority = ToMsmqMessagePriority(message.NMSPriority);
 			msmqMessage.ResponseQueue = ToMsmqDestination(message.NMSReplyTo);
 			if(message.NMSType != null)
 			{
-				msmqMessage.Label = message.NMSType;
-			}
+                if(SetLabelAsNMSType)
+                {
+				    propertyData.SetString("NMSType", message.NMSType);
+                }
+                else
+                {
+                    msmqMessage.Label = message.NMSType;
+                }
+			}
+
+            // Populate property data
+            foreach(object keyObject in message.Properties.Keys)
+            {
+              string key = (keyObject as string);
+              object val = message.Properties.GetString(key);
+              if(!SetLabelAsNMSType && string.Compare(key, "Label", true) == 0 && val != null)
+              {
+				msmqMessage.Label = val.ToString();
+              }
+              else
+              {
+				propertyData[key] = val;
+              }
+            }
 
-			// Store the NMS meta data in the extension area
-			msmqMessage.Extension = metaData.Marshal();
+			// Store the NMS property data in the extension area
+			msmqMessage.Extension = propertyData.Marshal();
 			return msmqMessage;
 		}
 
+        /// <summary>
+        /// Converts the specified MSMQ message to an equivalent NMS message
+        /// (including its message body).
+        /// </summary>
+        /// <param name="message">MSMQ message to be converted.</param>
+        /// <result>Converted NMS message.</result>
 		public virtual IMessage ToNmsMessage(Message message)
 		{
-			BaseMessage answer = CreateNmsMessage(message);
-			// Get the NMS meta data from the extension area
-			PrimitiveMap metaData = PrimitiveMap.Unmarshal(message.Extension);
+            return ToNmsMessage(message, true);
+        }
+
+        /// <summary>
+        /// Converts the specified MSMQ message to an equivalent NMS message.
+        /// </summary>
+        /// <param name="message">MSMQ message to be converted.</param>
+        /// <param name="convertBody">true if message body should be converted.</param>
+        /// <result>Converted NMS message.</result>
+		public virtual IMessage ToNmsMessage(Message message, bool convertBody)
+		{
+			BaseMessage answer = CreateNmsMessage(message, convertBody);
+
+			// Get the NMS property data from the extension area
+			PrimitiveMap propertyData = PrimitiveMap.Unmarshal(message.Extension);
 
 			try
 			{
 				answer.NMSMessageId = message.Id;
-				answer.NMSCorrelationID = metaData.GetString("NMSCorrelationID");
+				answer.NMSCorrelationID = propertyData.GetString("NMSCorrelationID");
 				answer.NMSDeliveryMode = (message.Recoverable ? MsgDeliveryMode.Persistent : MsgDeliveryMode.NonPersistent);
 				answer.NMSDestination = ToNmsDestination(message.DestinationQueue);
 			}
@@ -83,18 +172,85 @@ namespace Apache.NMS.MSMQ
 
 			try
 			{
-				answer.NMSType = message.Label;
 				answer.NMSReplyTo = ToNmsDestination(message.ResponseQueue);
 				answer.NMSTimeToLive = message.TimeToBeReceived;
+			    answer.NMSPriority = ToNmsMsgPriority(message.Priority);
 			}
 			catch(InvalidOperationException)
 			{
 			}
 
+			try
+			{
+                if(message.Label != null)
+                {
+                    if(SetLabelAsNMSType)
+                    {
+                        answer.NMSType = message.Label;
+                    }
+                    else
+                    {
+                        answer.Properties["Label"] = message.Label;
+                    }
+                }
+                answer.Properties["LookupId"] = message.LookupId;
+			}
+			catch(InvalidOperationException)
+			{
+			}
+
+            foreach(object keyObject in propertyData.Keys)
+            {
+			    try
+			    {
+                    string key = (keyObject as string);
+                    if(string.Compare(key, "NMSType", true) == 0)
+                    {
+			    	    answer.NMSType = propertyData.GetString(key);
+                    }
+                    else if(string.Compare(key, "NMSCorrelationID", true) == 0)
+                    {
+			    	    answer.NMSCorrelationID = propertyData.GetString("NMSCorrelationID");
+                    }
+                    else
+                    {
+			    	    answer.Properties[key] = propertyData[key];
+                    }
+			    }
+			    catch(InvalidOperationException)
+			    {
+			    }
+            }
 			return answer;
 		}
 
-		private static MessagePriority ToMessagePriority(MsgPriority msgPriority)
+        #endregion
+
+        #region Message priority
+
+        // Message priorities are defined as follows :
+        // | MSMQ               | NMS                |
+        // | MessagePriority	| MsgPriority        |
+        // +--------------------+--------------------+
+        // | Lowest             | Lowest             |
+        // | VeryLow            | VeryLow            |
+        // | Low                | Low                |
+        // |                \-> | AboveLow           |
+        // |                /-> | BelowNormal        |
+        // | Normal             | Normal             |
+        // | AboveNormal        | AboveNormal        |
+        // | High               | High               |
+        // | VeryHigh           | VeryHigh           |
+        // | Highest            | Highest            |
+        // +--------------------+--------------------+
+
+        /// <summary>
+        /// Converts the specified NMS message priority to an equivalent MSMQ
+        /// message priority.
+        /// </summary>
+        /// <param name="msgPriority">NMS message priority to be converted.</param>
+        /// <result>Converted MSMQ message priority.</result>
+		private static MessagePriority ToMsmqMessagePriority(MsgPriority msgPriority)
 		{
 			switch(msgPriority)
 			{
@@ -127,6 +283,153 @@ namespace Apache.NMS.MSMQ
 			}
 		}
 
+        /// <summary>
+        /// Converts the specified MSMQ message priority to an equivalent NMS
+        /// message priority.
+        /// </summary>
+        /// <param name="messagePriority">MSMQ message priority to be converted.</param>
+        /// <result>Converted NMS message priority.</result>
+		private static MsgPriority ToNmsMsgPriority(MessagePriority messagePriority)
+		{
+			switch(messagePriority)
+			{
+			case MessagePriority.Lowest:
+				return MsgPriority.Lowest;
+
+			case MessagePriority.VeryLow:
+				return MsgPriority.VeryLow;
+
+			case MessagePriority.Low:
+				return MsgPriority.Low;
+
+			default:
+			case MessagePriority.Normal:
+				return MsgPriority.Normal;
+
+			case MessagePriority.AboveNormal:
+				return MsgPriority.AboveNormal;
+
+			case MessagePriority.High:
+				return MsgPriority.High;
+
+			case MessagePriority.VeryHigh:
+				return MsgPriority.VeryHigh;
+
+			case MessagePriority.Highest:
+				return MsgPriority.Highest;
+			}
+		}
+
+        #endregion
+
+        #region Message creation
+
+        // Conversion of the message body has been separated from the creation
+        // of the NMS message object for performance reasons when using
+        // selectors (selectors handle only message attributes, not message
+        // bodies).
+        // CreateNmsMessage(Message) is maintained for compatibility reasons
+        // with existing clients that may have implemented derived classes,
+        // instead of completely removing the body conversion part from the
+        // method.
+
+        /// <summary>
+        /// Creates an NMS message of appropriate type for the specified MSMQ
+        /// message, and convert the message body.
+        /// </summary>
+        /// <param name="message">MSMQ message.</param>
+        /// <result>NMS message created for retrieving the MSMQ message.</result>
+		protected virtual BaseMessage CreateNmsMessage(Message message)
+		{
+            return CreateNmsMessage(message, true);
+        }
+
+        /// <summary>
+        /// Creates an NMS message of appropriate type for the specified MSMQ
+        /// message, and convert the message body if specified.
+        /// </summary>
+        /// <param name="message">MSMQ message.</param>
+        /// <param name="convertBody">true if the message body must be
+        /// converted.</param>
+        /// <result>NMS message created for retrieving the MSMQ message.</result>
+		protected virtual BaseMessage CreateNmsMessage(Message message,
+            bool convertBody)
+		{
+			BaseMessage result = null;
+
+			if((int) NMSMessageType.TextMessage == message.AppSpecific)
+			{
+				TextMessage textMessage = new TextMessage();
+
+                if(convertBody)
+                {
+                    ConvertTextMessageBodyToNMS(message, textMessage);
+                }
+
+				result = textMessage;
+			}
+			else if((int) NMSMessageType.BytesMessage == message.AppSpecific)
+			{
+				BytesMessage bytesMessage = new BytesMessage();
+
+                if(convertBody)
+                {
+                    ConvertBytesMessageBodyToNMS(message, bytesMessage);
+                }
+
+				result = bytesMessage;
+			}
+			else if((int) NMSMessageType.ObjectMessage == message.AppSpecific)
+			{
+				ObjectMessage objectMessage = new ObjectMessage();
+
+                if(convertBody)
+                {
+                    ConvertObjectMessageBodyToNMS(message, objectMessage);
+                }
+
+				result = objectMessage;
+			}
+			else if((int) NMSMessageType.MapMessage == message.AppSpecific)
+			{
+				MapMessage mapMessage = new MapMessage();
+
+                if(convertBody)
+                {
+                    ConvertMapMessageBodyToNMS(message, mapMessage);
+                }
+
+				result = mapMessage;
+			}
+			else if((int) NMSMessageType.StreamMessage == message.AppSpecific)
+			{
+				StreamMessage streamMessage = new StreamMessage();
+
+                if(convertBody)
+                {
+                    ConvertStreamMessageBodyToNMS(message, streamMessage);
+                }
+
+				result = streamMessage;
+			}
+			else
+			{
+				BaseMessage baseMessage = new BaseMessage();
+				result = baseMessage;
+			}
+
+			return result;
+		}
+
+        #endregion
+
+        #region Message body
+
+        /// <summary>
+        /// Converts an NMS message body to the equivalent MSMQ message body.
+        /// </summary>
+        /// <param name="message">Source NMS message.</param>
+        /// <param name="answer">Target MSMQ message.</param>
 		protected virtual void ConvertMessageBodyToMSMQ(IMessage message, Message answer)
 		{
 			if(message is TextMessage)
@@ -172,78 +475,133 @@ namespace Apache.NMS.MSMQ
 			}
 		}
 
-		protected virtual BaseMessage CreateNmsMessage(Message message)
+        /// <summary>
+        /// Converts an MSMQ message body to the equivalent NMS message body.
+        /// </summary>
+        /// <param name="message">Source MSMQ message.</param>
+        /// <param name="answer">Target NMS message.</param>
+		public virtual void ConvertMessageBodyToNMS(Message message, IMessage answer)
 		{
-			BaseMessage result = null;
-
-			if((int) NMSMessageType.TextMessage == message.AppSpecific)
+			if(answer is TextMessage)
 			{
-				TextMessage textMessage = new TextMessage();
-				string content = String.Empty;
-
-				if(message.BodyStream != null && message.BodyStream.Length > 0)
-				{
-					byte[] buf = null;
-					buf = new byte[message.BodyStream.Length];
-					message.BodyStream.Read(buf, 0, buf.Length);
-					content = Encoding.UTF32.GetString(buf);
-				}
-
-				textMessage.Text = content;
-				result = textMessage;
+				ConvertTextMessageBodyToNMS(message, (TextMessage)answer);
 			}
-			else if((int) NMSMessageType.BytesMessage == message.AppSpecific)
+			else if(answer is BytesMessage)
 			{
-				byte[] buf = null;
-
-				if(message.BodyStream != null && message.BodyStream.Length > 0)
-				{
-					buf = new byte[message.BodyStream.Length];
-					message.BodyStream.Read(buf, 0, buf.Length);
-				}
-
-				BytesMessage bytesMessage = new BytesMessage();
-				bytesMessage.Content = buf;
-				result = bytesMessage;
+				ConvertBytesMessageBodyToNMS(message, (BytesMessage)answer);
 			}
-			else if((int) NMSMessageType.ObjectMessage == message.AppSpecific)
+			else if(answer is ObjectMessage)
 			{
-				ObjectMessage objectMessage = new ObjectMessage();
-
-				objectMessage.Body = message.Body;
-				result = objectMessage;
+				ConvertObjectMessageBodyToNMS(message, (ObjectMessage)answer);
 			}
-			else if((int) NMSMessageType.MapMessage == message.AppSpecific)
+			else if(answer is MapMessage)
 			{
-				byte[] buf = null;
-
-				if(message.BodyStream != null && message.BodyStream.Length > 0)
-				{
-					buf = new byte[message.BodyStream.Length];
-					message.BodyStream.Read(buf, 0, buf.Length);
-				}
-
-				MapMessage mapMessage = new MapMessage();
-				mapMessage.Body = PrimitiveMap.Unmarshal(buf);
-				result = mapMessage;
+				ConvertMapMessageBodyToNMS(message, (MapMessage)answer);
 			}
-			else if((int) NMSMessageType.StreamMessage == message.AppSpecific)
+			else if(answer is StreamMessage)
 			{
-				StreamMessage streamMessage = new StreamMessage();
+				ConvertStreamMessageBodyToNMS(message, (StreamMessage)answer);
+			}
 
-				// TODO: Implement
-				result = streamMessage;
+			return;
+		}
+
+        /// <summary>
+        /// Converts an MSMQ message body to the equivalent NMS text message
+        /// body.
+        /// </summary>
+        /// <param name="message">Source MSMQ message.</param>
+        /// <param name="answer">Target NMS text message.</param>
+		public virtual void ConvertTextMessageBodyToNMS(Message message,
+            TextMessage answer)
+		{
+			string content = String.Empty;
+
+			if(message.BodyStream != null && message.BodyStream.Length > 0)
+			{
+				byte[] buf = new byte[message.BodyStream.Length];
+				message.BodyStream.Read(buf, 0, buf.Length);
+				content = Encoding.UTF32.GetString(buf);
 			}
-			else
+
+			answer.Text = content;
+		}
+
+        /// <summary>
+        /// Converts an MSMQ message body to the equivalent NMS bytes message
+        /// body.
+        /// </summary>
+        /// <param name="message">Source MSMQ message.</param>
+        /// <param name="answer">Target NMS bytes message.</param>
+		public virtual void ConvertBytesMessageBodyToNMS(Message message,
+            BytesMessage answer)
+		{
+			byte[] buf = null;
+
+			if(message.BodyStream != null && message.BodyStream.Length > 0)
 			{
-				BaseMessage baseMessage = new BaseMessage();
+				buf = new byte[message.BodyStream.Length];
+				message.BodyStream.Read(buf, 0, buf.Length);
+			}
 
-				result = baseMessage;
+			answer.Content = buf;
+		}
+
+        /// <summary>
+        /// Converts an MSMQ message body to the equivalent NMS object message
+        /// body.
+        /// </summary>
+        /// <param name="message">Source MSMQ message.</param>
+        /// <param name="answer">Target NMS object message.</param>
+		public virtual void ConvertObjectMessageBodyToNMS(Message message,
+            ObjectMessage answer)
+		{
+			answer.Body = message.Body;
+		}
+
+        /// <summary>
+        /// Converts an MSMQ message body to the equivalent NMS map message
+        /// body.
+        /// </summary>
+        /// <param name="message">Source MSMQ message.</param>
+        /// <param name="answer">Target NMS map message.</param>
+		public virtual void ConvertMapMessageBodyToNMS(Message message,
+            MapMessage answer)
+		{
+			byte[] buf = null;
+
+			if(message.BodyStream != null && message.BodyStream.Length > 0)
+			{
+				buf = new byte[message.BodyStream.Length];
+				message.BodyStream.Read(buf, 0, buf.Length);
 			}
 
-			return result;
+			answer.Body = PrimitiveMap.Unmarshal(buf);
+		}
+
+        /// <summary>
+        /// Converts an MSMQ message body to the equivalent NMS stream message
+        /// body.
+        /// </summary>
+        /// <param name="message">Source MSMQ message.</param>
+        /// <param name="answer">Target NMS stream message.</param>
+		public virtual void ConvertStreamMessageBodyToNMS(Message message,
+            StreamMessage answer)
+		{
+			// TODO: Implement
+            throw new NotImplementedException();
 		}
 
+        #endregion
+
+        #region Destination
+
+        /// <summary>
+        /// Converts an NMS destination to the equivalent MSMQ destination
+        /// (ie. queue).
+        /// </summary>
+        /// <param name="destination">NMS destination.</param>
+        /// <result>MSMQ queue.</result>
 		public MessageQueue ToMsmqDestination(IDestination destination)
 		{
 			if(null == destination)
@@ -254,6 +612,12 @@ namespace Apache.NMS.MSMQ
 			return new MessageQueue((destination as Destination).Path);
 		}
 
+        /// <summary>
+        /// Converts an MSMQ destination (ie. queue) to the equivalent NMS
+        /// destination.
+        /// </summary>
+        /// <param name="destinationQueue">MSMQ destination queue.</param>
+        /// <result>NMS destination.</result>
 		protected virtual IDestination ToNmsDestination(MessageQueue destinationQueue)
 		{
 			if(null == destinationQueue)
@@ -263,5 +627,7 @@ namespace Apache.NMS.MSMQ
 
 			return new Queue(destinationQueue.Path);
 		}
+
+        #endregion
 	}
 }

Modified: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/IMessageConverter.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/IMessageConverter.cs?rev=1751831&r1=1751830&r2=1751831&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/IMessageConverter.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/IMessageConverter.cs Thu Jul  7 20:47:10 2016
@@ -20,15 +20,27 @@ namespace Apache.NMS.MSMQ
 {
 	public interface IMessageConverter
 	{
-
-		/// <summary>
-		/// Method ToMSMQMessageQueue
-		/// </summary>
-		/// <param name="destination">An IDestination</param>
-		/// <returns>A  MessageQueue</returns>
-		MessageQueue ToMsmqDestination(IDestination destination);
-
+        /// <summary>
+        /// Converts the specified NMS message to an equivalent MSMQ message.
+        /// </summary>
+        /// <param name="message">NMS message to be converted.</param>
+        /// <result>Converted MSMQ message.</result>
 		Message ToMsmqMessage(IMessage message);
+
+        /// <summary>
+        /// Converts the specified MSMQ message to an equivalent NMS message
+        /// (including its message body).
+        /// </summary>
+        /// <param name="message">MSMQ message to be converted.</param>
+        /// <result>Converted NMS message.</result>
 		IMessage ToNmsMessage(Message message);
+
+        /// <summary>
+        /// Converts an NMS destination to the equivalent MSMQ destination
+        /// (ie. queue).
+        /// </summary>
+        /// <param name="destination">NMS destination.</param>
+        /// <result>MSMQ queue.</result>
+		MessageQueue ToMsmqDestination(IDestination destination);
 	}
 }

Added: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/IMessageConverterEx.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/IMessageConverterEx.cs?rev=1751831&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/IMessageConverterEx.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/IMessageConverterEx.cs Thu Jul  7 20:47:10 2016
@@ -0,0 +1,44 @@
+using System.Messaging;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.NMS.MSMQ
+{
+    /// <summary>
+    /// Extended IMessageConverter interface supporting new methods for
+    /// optimizing message selection through "selectors".
+    /// The original IMessageConverter is maintained for compatibility
+    /// reasons with existing clients implementing it.
+    /// </summary>
+	public interface IMessageConverterEx : IMessageConverter
+	{
+        /// <summary>
+        /// Converts the specified MSMQ message to an equivalent NMS message.
+        /// </summary>
+        /// <param name="message">MSMQ message to be converted.</param>
+        /// <param name="convertBody">true if message body should be converted.</param>
+        /// <result>Converted NMS message.</result>
+		IMessage ToNmsMessage(Message message, bool convertBody);
+
+        /// <summary>
+        /// Converts an MSMQ message body to the equivalent NMS message body.
+        /// </summary>
+        /// <param name="message">Source MSMQ message.</param>
+        /// <param name="answer">Target NMS message.</param>
+		void ConvertMessageBodyToNMS(Message message, IMessage answer);
+	}
+}

Modified: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=1751831&r1=1751830&r2=1751831&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/MessageConsumer.cs Thu Jul  7 20:47:10 2016
@@ -1,6 +1,8 @@
 using System;
 using System.Messaging;
 using System.Threading;
+using Apache.NMS.Util;
+using Apache.NMS.MSMQ.Readers;
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -17,12 +19,11 @@ using System.Threading;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using Apache.NMS.Util;
 
 namespace Apache.NMS.MSMQ
 {
     /// <summary>
-    /// An object capable of receiving messages from some destination
+    /// An object capable of receiving messages from some destination.
     /// </summary>
     public class MessageConsumer : IMessageConsumer
     {
@@ -31,8 +32,6 @@ namespace Apache.NMS.MSMQ
         private readonly Session session;
         private readonly AcknowledgementMode acknowledgementMode;
         private MessageQueue messageQueue;
-        private event MessageListener listener;
-        private int listenerCount = 0;
         private Thread asyncDeliveryThread = null;
         private AutoResetEvent pause = new AutoResetEvent(false);
         private Atomic<bool> asyncDelivery = new Atomic<bool>(false);
@@ -44,17 +43,46 @@ namespace Apache.NMS.MSMQ
             set { this.consumerTransformer = value; }
         }
 
-        public MessageConsumer(Session session, AcknowledgementMode acknowledgementMode, MessageQueue messageQueue)
+        private IMessageReader reader;
+
+        /// <summary>
+        /// Constructs a message consumer on the specified queue.
+        /// </summary>
+        /// <param name="session">The messaging session.</param>
+        /// <param name="acknowledgementMode">The message acknowledgement mode.</param>
+        /// <param name="messageQueue">The message queue to consume messages from.</param>
+        public MessageConsumer(Session session,
+            AcknowledgementMode acknowledgementMode, MessageQueue messageQueue)
+            : this(session, acknowledgementMode, messageQueue, null)
+        {
+        }
+
+        /// <summary>
+        /// Constructs a message consumer on the specified queue, using a
+        /// selector for filtering incoming messages.
+        /// </summary>
+        /// <param name="session">The messaging session.</param>
+        /// <param name="acknowledgementMode">The message acknowledgement mode.</param>
+        /// <param name="messageQueue">The message queue to consume messages from.</param>
+        /// <param name="selector">The selection criteria.</param>
+        public MessageConsumer(Session session,
+            AcknowledgementMode acknowledgementMode, MessageQueue messageQueue,
+            string selector)
         {
             this.session = session;
             this.acknowledgementMode = acknowledgementMode;
             this.messageQueue = messageQueue;
-            if(null != this.messageQueue)
+            if(this.messageQueue != null)
             {
                 this.messageQueue.MessageReadPropertyFilter.SetAll();
             }
+
+            reader = MessageReaderUtil.CreateMessageReader(
+                messageQueue, session.MessageConverter, selector);
         }
 
+        private int listenerCount = 0;
+        private event MessageListener listener;
         public event MessageListener Listener
         {
             add
@@ -85,32 +113,8 @@ namespace Apache.NMS.MSMQ
 
             if(messageQueue != null)
             {
-                Message message;
-
-                try
-                {
-                    message = messageQueue.Receive(zeroTimeout);
-                }
-                catch
-                {
-                    message = null;
-                }
-
-                if(null == message)
-                {
-                    ReceiveCompletedEventHandler receiveMsg =
-                            delegate(Object source, ReceiveCompletedEventArgs asyncResult) {
-                                message = messageQueue.EndReceive(asyncResult.AsyncResult);
-                                pause.Set();
-                            };
-
-                    messageQueue.ReceiveCompleted += receiveMsg;
-                    messageQueue.BeginReceive();
-                    pause.WaitOne();
-                    messageQueue.ReceiveCompleted -= receiveMsg;
-                }
-
-                nmsMessage = ToNmsMessage(message);
+                nmsMessage = reader.Receive();
+                nmsMessage = TransformMessage(nmsMessage);
             }
 
             return nmsMessage;
@@ -122,8 +126,8 @@ namespace Apache.NMS.MSMQ
 
             if(messageQueue != null)
             {
-                Message message = messageQueue.Receive(timeout);
-                nmsMessage = ToNmsMessage(message);
+                nmsMessage = reader.Receive(timeout);
+                nmsMessage = TransformMessage(nmsMessage);
             }
 
             return nmsMessage;
@@ -135,8 +139,8 @@ namespace Apache.NMS.MSMQ
 
             if(messageQueue != null)
             {
-                Message message = messageQueue.Receive(zeroTimeout);
-                nmsMessage = ToNmsMessage(message);
+                nmsMessage = reader.Receive(zeroTimeout);
+                nmsMessage = TransformMessage(nmsMessage);
             }
 
             return nmsMessage;
@@ -226,25 +230,20 @@ namespace Apache.NMS.MSMQ
             session.Connection.HandleException(e);
         }
 
-        protected virtual IMessage ToNmsMessage(Message message)
+        protected virtual IMessage TransformMessage(IMessage message)
         {
-            if(message == null)
-            {
-                return null;
-            }
-
-            IMessage converted = session.MessageConverter.ToNmsMessage(message);
+            IMessage transformed = message;
 
-            if(this.ConsumerTransformer != null)
+            if(message != null && this.ConsumerTransformer != null)
             {
-                IMessage newMessage = ConsumerTransformer(this.session, this, converted);
+                IMessage newMessage = ConsumerTransformer(this.session, this, message);
                 if(newMessage != null)
                 {
-                    converted = newMessage;
+                    transformed = newMessage;
                 }
             }
 
-            return converted;
+            return transformed;
         }
     }
 }

Modified: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/QueueBrowser.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/QueueBrowser.cs?rev=1751831&r1=1751830&r2=1751831&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/QueueBrowser.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/QueueBrowser.cs Thu Jul  7 20:47:10 2016
@@ -19,6 +19,7 @@ using System.Collections;
 using System.Messaging;
 using Apache.NMS;
 using Apache.NMS.Util;
+using Apache.NMS.MSMQ.Readers;
 
 namespace Apache.NMS.MSMQ
 {
@@ -30,7 +31,17 @@ namespace Apache.NMS.MSMQ
         private readonly Session session;
         private MessageQueue messageQueue;
 
+        private string selector;
+
+        private IMessageReader reader;
+        
 		public QueueBrowser(Session session, MessageQueue messageQueue)
+            : this(session, messageQueue, null)
+		{
+		}
+
+		public QueueBrowser(Session session, MessageQueue messageQueue,
+            string selector)
 		{
             this.session = session;
             this.messageQueue = messageQueue;
@@ -39,6 +50,8 @@ namespace Apache.NMS.MSMQ
                 this.messageQueue.MessageReadPropertyFilter.SetAll();
             }
 
+            reader = MessageReaderUtil.CreateMessageReader(
+                messageQueue, session.MessageConverter, selector);
 		}
 
 		~QueueBrowser()
@@ -95,7 +108,7 @@ namespace Apache.NMS.MSMQ
 
 		public string MessageSelector
 		{
-			get { throw new NotSupportedException(); }
+			get { return selector; }
 		}
 
 		public IQueue Queue
@@ -107,11 +120,14 @@ namespace Apache.NMS.MSMQ
 		{
 			private readonly Session session;
 			private readonly MessageEnumerator innerEnumerator;
+            private readonly IMessageReader reader;
 
-			public Enumerator(Session session, MessageQueue messageQueue)
+			public Enumerator(Session session, MessageQueue messageQueue,
+                IMessageReader reader)
 			{
 				this.session = session;
 				this.innerEnumerator = messageQueue.GetMessageEnumerator2();
+                this.reader = reader;
 			}
 
 			public object Current
@@ -124,7 +140,14 @@ namespace Apache.NMS.MSMQ
 
 			public bool MoveNext()
 			{
-				return this.innerEnumerator.MoveNext();
+                while(this.innerEnumerator.MoveNext())
+                {
+				    if(reader.Matches(this.innerEnumerator.Current))
+                    {
+                        return true;
+                    }
+                }
+                return false;
 			}
 
 			public void Reset()
@@ -135,7 +158,7 @@ namespace Apache.NMS.MSMQ
 
 		public IEnumerator GetEnumerator()
 		{
-			return new Enumerator(this.session, this.messageQueue);
+			return new Enumerator(this.session, this.messageQueue, this.reader);
 		}
 	}
 }

Added: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/AbstractMessageReader.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/AbstractMessageReader.cs?rev=1751831&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/AbstractMessageReader.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/AbstractMessageReader.cs Thu Jul  7 20:47:10 2016
@@ -0,0 +1,126 @@
+using System;
+using System.Messaging;
+using Apache.NMS.MSMQ;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.NMS.MSMQ.Readers
+{
+    /// <summary>
+    /// Abstract MSMQ message reader. Derived classes support various
+    /// message filtering methods.
+    /// </summary>
+	public abstract class AbstractMessageReader : IMessageReader
+	{
+        protected MessageQueue messageQueue;
+        protected IMessageConverter messageConverter;
+        protected IMessageConverterEx messageConverterEx;
+
+        /// <summary>
+        /// Constructor.
+        /// </summary>
+        /// <param name="messageQueue">The MSMQ message queue from which
+        /// messages will be read.</param>
+        /// <param name="messageConverter">A message converter for mapping
+        /// MSMQ messages to NMS messages.</param>
+        public AbstractMessageReader(MessageQueue messageQueue,
+            IMessageConverter messageConverter)
+        {
+            this.messageQueue = messageQueue;
+
+            this.messageConverter = messageConverter;
+            this.messageConverterEx = (messageConverter as IMessageConverterEx);
+        }
+
+        /// <summary>
+        /// Returns without removing (peeks) the first message in the queue
+        /// referenced by this MessageQueue matching the selection criteria.
+        /// The Peek method is synchronous, so it blocks the current thread
+        /// until a message becomes available.
+        /// </summary>
+        /// <returns>Peeked message.</returns>
+        public abstract IMessage Peek();
+
+        /// <summary>
+        /// Returns without removing (peeks) the first message in the queue
+        /// referenced by this MessageQueue matching the selection criteria.
+        /// The Peek method is synchronous, so it blocks the current thread
+        /// until a message becomes available or the specified time-out occurs.
+        /// </summary>
+        /// <param name="timeSpan">Reception time-out.</param>
+        /// <returns>Peeked message.</returns>
+        public abstract IMessage Peek(TimeSpan timeSpan);
+
+        /// <summary>
+        /// Receives the first message available in the queue referenced by
+        /// the MessageQueue matching the selection criteria.
+        /// This call is synchronous, and blocks the current thread of execution
+        /// until a message is available.
+        /// </summary>
+        /// <returns>Received message.</returns>
+        public abstract IMessage Receive();
+
+        /// <summary>
+        /// Receives the first message available in the queue referenced by the
+        /// MessageQueue matching the selection criteria, and waits until either
+        /// a message is available in the queue, or the time-out expires.
+        /// </summary>
+        /// <param name="timeSpan">Reception time-out.</param>
+        /// <returns>Received message.</returns>
+        public abstract IMessage Receive(TimeSpan timeSpan);
+
+        /// <summary>
+        /// Receives the first message available in the transactional queue
+        /// referenced by the MessageQueue matching the selection criteria.
+        /// This call is synchronous, and blocks the current thread of execution
+        /// until a message is available.
+        /// </summary>
+        /// <param name="transaction">Transaction.</param>
+        /// <returns>Received message.</returns>
+        public abstract IMessage Receive(MessageQueueTransaction transaction);
+
+        /// <summary>
+        /// Receives the first message available in the transactional queue
+        /// referenced by the MessageQueue matching the selection criteria,
+        /// and waits until either a message is available in the queue, or the
+        /// time-out expires.
+        /// </summary>
+        /// <param name="timeSpan">Reception time-out.</param>
+        /// <param name="transaction">Transaction.</param>
+        /// <returns>Received message.</returns>
+        public abstract IMessage Receive(TimeSpan timeSpan,
+            MessageQueueTransaction transaction);
+
+        /// <summary>
+        /// Checks if an MSMQ message matches the selection criteria.
+        /// </summary>
+        /// <param name="message">MSMQ message.</param>
+        /// <return>true if the message matches the selection criteria.</return>
+        public abstract bool Matches(Message message);
+
+        /// <summary>
+        /// Converts an MSMQ message to an NMS message, using the converter
+        /// specified at construction time.
+        /// </summary>
+        /// <param name="message">MSMQ message.</param>
+        /// <return>NMS message.</return>
+        protected IMessage Convert(Message message)
+        {
+            return message == null ? null : messageConverter.ToNmsMessage(message);
+        }
+	}
+}

Added: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/ByCorrelationIdMessageReader.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/ByCorrelationIdMessageReader.cs?rev=1751831&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/ByCorrelationIdMessageReader.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/ByCorrelationIdMessageReader.cs Thu Jul  7 20:47:10 2016
@@ -0,0 +1,139 @@
+using System;
+using System.Messaging;
+using Apache.NMS.MSMQ;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.NMS.MSMQ.Readers
+{
+    /// <summary>
+    /// MSMQ message reader, returning messages matching the specified
+    /// message identifier.
+    /// </summary>
+	public class ByCorrelationIdMessageReader : AbstractMessageReader
+	{
+        private string correlationId;
+
+        /// <summary>
+        /// Constructor.
+        /// </summary>
+        /// <param name="messageQueue">The MSMQ message queue from which
+        /// messages will be read.</param>
+        /// <param name="messageConverter">A message converter for mapping
+        /// MSMQ messages to NMS messages.</param>
+        /// <param name="correlationId">The correlation identifier of messages
+        /// to be read.</param>
+        public ByCorrelationIdMessageReader(MessageQueue messageQueue,
+            IMessageConverter messageConverter, string correlationId)
+            : base(messageQueue, messageConverter)
+        {
+            this.correlationId = correlationId;
+        }
+
+        /// <summary>
+        /// Returns without removing (peeks) the first message in the queue
+        /// referenced by this MessageQueue matching the selection criteria.
+        /// The Peek method is synchronous, so it blocks the current thread
+        /// until a message becomes available.
+        /// </summary>
+        /// <returns>Peeked message.</returns>
+        public override IMessage Peek()
+        {
+            return Convert(messageQueue.PeekByCorrelationId(correlationId));
+        }
+
+        /// <summary>
+        /// Returns without removing (peeks) the first message in the queue
+        /// referenced by this MessageQueue matching the selection criteria.
+        /// The Peek method is synchronous, so it blocks the current thread
+        /// until a message becomes available or the specified time-out occurs.
+        /// </summary>
+        /// <param name="timeSpan">Reception time-out.</param>
+        /// <returns>Peeked message.</returns>
+        public override IMessage Peek(TimeSpan timeSpan)
+        {
+            return Convert(messageQueue.PeekByCorrelationId(correlationId,
+                timeSpan));
+        }
+
+        /// <summary>
+        /// Receives the first message available in the queue referenced by
+        /// the MessageQueue matching the selection criteria.
+        /// This call is synchronous, and blocks the current thread of execution
+        /// until a message is available.
+        /// </summary>
+        /// <returns>Received message.</returns>
+        public override IMessage Receive()
+        {
+            return Convert(messageQueue.ReceiveByCorrelationId(correlationId));
+        }
+
+        /// <summary>
+        /// Receives the first message available in the queue referenced by the
+        /// MessageQueue matching the selection criteria, and waits until either
+        /// a message is available in the queue, or the time-out expires.
+        /// </summary>
+        /// <param name="timeSpan">Reception time-out.</param>
+        /// <returns>Received message.</returns>
+        public override IMessage Receive(TimeSpan timeSpan)
+        {
+            return Convert(messageQueue.ReceiveByCorrelationId(correlationId,
+                timeSpan));
+        }
+
+        /// <summary>
+        /// Receives the first message available in the transactional queue
+        /// referenced by the MessageQueue matching the selection criteria.
+        /// This call is synchronous, and blocks the current thread of execution
+        /// until a message is available.
+        /// </summary>
+        /// <param name="transaction">Transaction.</param>
+        /// <returns>Received message.</returns>
+        public override IMessage Receive(MessageQueueTransaction transaction)
+        {
+            return Convert(messageQueue.ReceiveByCorrelationId(correlationId,
+                transaction));
+        }
+
+        /// <summary>
+        /// Receives the first message available in the transactional queue
+        /// referenced by the MessageQueue matching the selection criteria,
+        /// and waits until either a message is available in the queue, or the
+        /// time-out expires.
+        /// </summary>
+        /// <param name="timeSpan">Reception time-out.</param>
+        /// <param name="transaction">Transaction.</param>
+        /// <returns>Received message.</returns>
+        public override IMessage Receive(TimeSpan timeSpan,
+            MessageQueueTransaction transaction)
+        {
+            return Convert(messageQueue.ReceiveByCorrelationId(correlationId,
+                timeSpan, transaction));
+        }
+
+        /// <summary>
+        /// Checks if an MSMQ message matches the selection criteria.
+        /// </summary>
+        /// <param name="message">MSMQ message.</param>
+        /// <return>true if the message matches the selection criteria.</return>
+        public override bool Matches(Message message)
+        {
+            // NB: case-sensitive match
+            return message.CorrelationId == correlationId;
+        }
+	}
+}

Added: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/ByIdMessageReader.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/ByIdMessageReader.cs?rev=1751831&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/ByIdMessageReader.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/ByIdMessageReader.cs Thu Jul  7 20:47:10 2016
@@ -0,0 +1,136 @@
+using System;
+using System.Messaging;
+using Apache.NMS.MSMQ;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.NMS.MSMQ.Readers
+{
+    /// <summary>
+    /// MSMQ message reader, returning messages matching the specified
+    /// message identifier.
+    /// </summary>
+	public class ByIdMessageReader : AbstractMessageReader
+	{
+        private string messageId;
+
+        /// <summary>
+        /// Constructor.
+        /// </summary>
+        /// <param name="messageQueue">The MSMQ message queue from which
+        /// messages will be read.</param>
+        /// <param name="messageConverter">A message converter for mapping
+        /// MSMQ messages to NMS messages.</param>
+        /// <param name="messageId">The message identifier of messages to
+        /// be read.</param>
+        public ByIdMessageReader(MessageQueue messageQueue,
+            IMessageConverter messageConverter, string messageId)
+            : base(messageQueue, messageConverter)
+        {
+            this.messageId = messageId;
+        }
+
+        /// <summary>
+        /// Returns without removing (peeks) the first message in the queue
+        /// referenced by this MessageQueue matching the selection criteria.
+        /// The Peek method is synchronous, so it blocks the current thread
+        /// until a message becomes available.
+        /// </summary>
+        /// <returns>Peeked message.</returns>
+        public override IMessage Peek()
+        {
+            return Convert(messageQueue.PeekById(messageId));
+        }
+
+        /// <summary>
+        /// Returns without removing (peeks) the first message in the queue
+        /// referenced by this MessageQueue matching the selection criteria.
+        /// The Peek method is synchronous, so it blocks the current thread
+        /// until a message becomes available or the specified time-out occurs.
+        /// </summary>
+        /// <param name="timeSpan">Reception time-out.</param>
+        /// <returns>Peeked message.</returns>
+        public override IMessage Peek(TimeSpan timeSpan)
+        {
+            return Convert(messageQueue.PeekById(messageId, timeSpan));
+        }
+
+        /// <summary>
+        /// Receives the first message available in the queue referenced by
+        /// the MessageQueue matching the selection criteria.
+        /// This call is synchronous, and blocks the current thread of execution
+        /// until a message is available.
+        /// </summary>
+        /// <returns>Received message.</returns>
+        public override IMessage Receive()
+        {
+            return Convert(messageQueue.ReceiveById(messageId));
+        }
+
+        /// <summary>
+        /// Receives the first message available in the queue referenced by the
+        /// MessageQueue matching the selection criteria, and waits until either
+        /// a message is available in the queue, or the time-out expires.
+        /// </summary>
+        /// <param name="timeSpan">Reception time-out.</param>
+        /// <returns>Received message.</returns>
+        public override IMessage Receive(TimeSpan timeSpan)
+        {
+            return Convert(messageQueue.ReceiveById(messageId, timeSpan));
+        }
+
+        /// <summary>
+        /// Receives the first message available in the transactional queue
+        /// referenced by the MessageQueue matching the selection criteria.
+        /// This call is synchronous, and blocks the current thread of execution
+        /// until a message is available.
+        /// </summary>
+        /// <param name="transaction">Transaction.</param>
+        /// <returns>Received message.</returns>
+        public override IMessage Receive(MessageQueueTransaction transaction)
+        {
+            return Convert(messageQueue.ReceiveById(messageId, transaction));
+        }
+
+        /// <summary>
+        /// Receives the first message available in the transactional queue
+        /// referenced by the MessageQueue matching the selection criteria,
+        /// and waits until either a message is available in the queue, or the
+        /// time-out expires.
+        /// </summary>
+        /// <param name="timeSpan">Reception time-out.</param>
+        /// <param name="transaction">Transaction.</param>
+        /// <returns>Received message.</returns>
+        public override IMessage Receive(TimeSpan timeSpan,
+            MessageQueueTransaction transaction)
+        {
+            return Convert(messageQueue.ReceiveById(messageId, timeSpan,
+                transaction));
+        }
+
+        /// <summary>
+        /// Checks if an MSMQ message matches the selection criteria.
+        /// </summary>
+        /// <param name="message">MSMQ message.</param>
+        /// <return>true if the message matches the selection criteria.</return>
+        public override bool Matches(Message message)
+        {
+            // NB: case-sensitive match
+            return message.Id == messageId;
+        }
+	}
+}

Added: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/ByLookupIdMessageReader.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/ByLookupIdMessageReader.cs?rev=1751831&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/ByLookupIdMessageReader.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/ByLookupIdMessageReader.cs Thu Jul  7 20:47:10 2016
@@ -0,0 +1,145 @@
+using System;
+using System.Messaging;
+using Apache.NMS.MSMQ;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.NMS.MSMQ.Readers
+{
+    /// <summary>
+    /// MSMQ message reader, returning messages matching the specified
+    /// lookup identifier.
+    /// </summary>
+	public class ByLookupIdMessageReader : AbstractMessageReader
+	{
+        private Int64 lookupId;
+
+        /// <summary>
+        /// Constructor.
+        /// </summary>
+        /// <param name="messageQueue">The MSMQ message queue from which
+        /// messages will be read.</param>
+        /// <param name="messageConverter">A message converter for mapping
+        /// MSMQ messages to NMS messages.</param>
+        /// <param name="lookupId">The lookup identifier of the message
+        /// to be read.</param>
+        public ByLookupIdMessageReader(MessageQueue messageQueue,
+            IMessageConverter messageConverter, Int64 lookupId)
+            : base(messageQueue, messageConverter)
+        {
+            this.lookupId = lookupId;
+        }
+
+        /// <summary>
+        /// Returns without removing (peeks) the first message in the queue
+        /// referenced by this MessageQueue matching the selection criteria.
+        /// The Peek method is synchronous, so it blocks the current thread
+        /// until a message becomes available.
+        /// </summary>
+        /// <returns>Peeked message.</returns>
+        public override IMessage Peek()
+        {
+            return Convert(messageQueue.PeekByLookupId(lookupId));
+        }
+
+        /// <summary>
+        /// Returns without removing (peeks) the first message in the queue
+        /// referenced by this MessageQueue matching the selection criteria.
+        /// The Peek method is synchronous, so it blocks the current thread
+        /// until a message becomes available or the specified time-out occurs.
+        /// </summary>
+        /// <param name="timeSpan">Reception time-out.</param>
+        /// <returns>Peeked message.</returns>
+        public override IMessage Peek(TimeSpan timeSpan)
+        {
+            // No time-out option for receiving messages by lookup identifiers:
+            // either the message is present in the queue, or the method throws
+            // an exception immediately if the message is not in the queue. 
+            return Convert(messageQueue.PeekByLookupId(lookupId));
+        }
+
+        /// <summary>
+        /// Receives the first message available in the queue referenced by
+        /// the MessageQueue matching the selection criteria.
+        /// This call is synchronous, and blocks the current thread of execution
+        /// until a message is available.
+        /// </summary>
+        /// <returns>Received message.</returns>
+        public override IMessage Receive()
+        {
+            return Convert(messageQueue.ReceiveByLookupId(lookupId));
+        }
+
+        /// <summary>
+        /// Receives the first message available in the queue referenced by the
+        /// MessageQueue matching the selection criteria, and waits until either
+        /// a message is available in the queue, or the time-out expires.
+        /// </summary>
+        /// <param name="timeSpan">Reception time-out.</param>
+        /// <returns>Received message.</returns>
+        public override IMessage Receive(TimeSpan timeSpan)
+        {
+            // No time-out option for receiving messages by lookup identifiers:
+            // either the message is present in the queue, or the method throws
+            // an exception immediately if the message is not in the queue. 
+            return Convert(messageQueue.ReceiveByLookupId(lookupId));
+        }
+
+        /// <summary>
+        /// Receives the first message available in the transactional queue
+        /// referenced by the MessageQueue matching the selection criteria.
+        /// This call is synchronous, and blocks the current thread of execution
+        /// until a message is available.
+        /// </summary>
+        /// <param name="transaction">Transaction.</param>
+        /// <returns>Received message.</returns>
+        public override IMessage Receive(MessageQueueTransaction transaction)
+        {
+            return Convert(messageQueue.ReceiveByLookupId(
+                MessageLookupAction.Current, lookupId, transaction));
+        }
+
+        /// <summary>
+        /// Receives the first message available in the transactional queue
+        /// referenced by the MessageQueue matching the selection criteria,
+        /// and waits until either a message is available in the queue, or the
+        /// time-out expires.
+        /// </summary>
+        /// <param name="timeSpan">Reception time-out.</param>
+        /// <param name="transaction">Transaction.</param>
+        /// <returns>Received message.</returns>
+        public override IMessage Receive(TimeSpan timeSpan,
+            MessageQueueTransaction transaction)
+        {
+            // No time-out option for receiving messages by lookup identifiers:
+            // either the message is present in the queue, or the method throws
+            // an exception immediately if the message is not in the queue. 
+            return Convert(messageQueue.ReceiveByLookupId(
+                MessageLookupAction.Current, lookupId, transaction));
+        }
+
+        /// <summary>
+        /// Checks if an MSMQ message matches the selection criteria.
+        /// </summary>
+        /// <param name="message">MSMQ message.</param>
+        /// <return>true if the message matches the selection criteria.</return>
+        public override bool Matches(Message message)
+        {
+            return message.LookupId == lookupId;
+        }
+	}
+}

Added: activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/BySelectorMessageReader.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/BySelectorMessageReader.cs?rev=1751831&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/BySelectorMessageReader.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.MSMQ/trunk/src/main/csharp/Readers/BySelectorMessageReader.cs Thu Jul  7 20:47:10 2016
@@ -0,0 +1,290 @@
+using System;
+using System.Messaging;
+using Apache.NMS.MSMQ;
+using Apache.NMS;
+using Apache.NMS.Selector;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.NMS.MSMQ.Readers
+{
+    /// <summary>
+    /// MSMQ message reader, returning messages matching the specified
+    /// selector.
+    /// </summary>
+	public class BySelectorMessageReader : AbstractMessageReader
+	{
+        private string selector;
+        private MessageEvaluationContext evaluationContext;
+        private IBooleanExpression selectionExpression;
+
+        /// <summary>
+        /// Constructor.
+        /// </summary>
+        /// <param name="messageQueue">The MSMQ message queue from which
+        /// messages will be read.</param>
+        /// <param name="messageConverter">A message converter for mapping
+        /// MSMQ messages to NMS messages.</param>
+        /// <param name="selector">The selector string.</param>
+        public BySelectorMessageReader(MessageQueue messageQueue,
+            IMessageConverter messageConverter, string selector)
+            : base(messageQueue, messageConverter)
+        {
+            this.selector = selector;
+
+            SelectorParser selectorParser = new SelectorParser();
+            selectionExpression = selectorParser.Parse(selector);
+
+            evaluationContext = new MessageEvaluationContext(null);
+        }
+
+        /// <summary>
+        /// Returns without removing (peeks) the first message in the queue
+        /// referenced by this MessageQueue matching the selection criteria.
+        /// The Peek method is synchronous, so it blocks the current thread
+        /// until a message becomes available.
+        /// </summary>
+        /// <returns>Peeked message.</returns>
+        public override IMessage Peek()
+        {
+            return InternalPeek(DateTime.MaxValue, true);
+        }
+
+        /// <summary>
+        /// Returns without removing (peeks) the first message in the queue
+        /// referenced by this MessageQueue matching the selection criteria.
+        /// The Peek method is synchronous, so it blocks the current thread
+        /// until a message becomes available or the specified time-out occurs.
+        /// </summary>
+        /// <param name="timeSpan">Reception time-out.</param>
+        /// <returns>Peeked message.</returns>
+        public override IMessage Peek(TimeSpan timeSpan)
+        {
+            DateTime maxTime = DateTime.Now + timeSpan;
+            return InternalPeek(maxTime, true);
+        }
+
+        /// <summary>
+        /// Receives the first message available in the queue referenced by
+        /// the MessageQueue matching the selection criteria.
+        /// This call is synchronous, and blocks the current thread of execution
+        /// until a message is available.
+        /// </summary>
+        /// <returns>Received message.</returns>
+        public override IMessage Receive()
+        {
+            return InternalReceive(DateTime.MaxValue, null);
+        }
+
+        /// <summary>
+        /// Receives the first message available in the queue referenced by the
+        /// MessageQueue matching the selection criteria, and waits until either
+        /// a message is available in the queue, or the time-out expires.
+        /// </summary>
+        /// <param name="timeSpan">Reception time-out.</param>
+        /// <returns>Received message.</returns>
+        public override IMessage Receive(TimeSpan timeSpan)
+        {
+            return InternalReceive(DateTime.Now + timeSpan, null);
+        }
+
+        /// <summary>
+        /// Receives the first message available in the transactional queue
+        /// referenced by the MessageQueue matching the selection criteria.
+        /// This call is synchronous, and blocks the current thread of execution
+        /// until a message is available.
+        /// </summary>
+        /// <param name="transaction">Transaction.</param>
+        /// <returns>Received message.</returns>
+        public override IMessage Receive(MessageQueueTransaction transaction)
+        {
+            return InternalReceive(DateTime.MaxValue, transaction);
+        }
+
+        /// <summary>
+        /// Receives the first message available in the transactional queue
+        /// referenced by the MessageQueue matching the selection criteria,
+        /// and waits until either a message is available in the queue, or the
+        /// time-out expires.
+        /// </summary>
+        /// <param name="timeSpan">Reception time-out.</param>
+        /// <param name="transaction">Transaction.</param>
+        /// <returns>Received message.</returns>
+        public override IMessage Receive(TimeSpan timeSpan,
+            MessageQueueTransaction transaction)
+        {
+            return InternalReceive(DateTime.Now + timeSpan, transaction);
+        }
+
+        /// <summary>
+        /// Receives the first message available in the transactional queue
+        /// referenced by the MessageQueue matching the selection criteria,
+        /// and waits until either a message is available in the queue, or the
+        /// time-out expires.
+        /// </summary>
+        /// <param name="maxTime">Reception time-out.</param>
+        /// <param name="transaction">Transaction.</param>
+        /// <returns>Received message.</returns>
+        public IMessage InternalReceive(DateTime maxTime,
+            MessageQueueTransaction transaction)
+        {
+            // In a shared connection / multi-consumer context, the message may
+            // have been consumed by another client, after it was peeked but
+            // before it was peeked by this client. Hence the loop.
+            // (not sure it can be shared AND transactional, though).
+            while(true)
+            {
+                IMessage peekedMessage = InternalPeek(maxTime, false);
+
+                if(peekedMessage == null)
+                {
+                    return null;
+                }
+
+                try
+                {
+                    long lookupId = peekedMessage.Properties.GetLong("LookupId");
+
+                    Message message = (transaction == null ?
+                        messageQueue.ReceiveByLookupId(lookupId) :
+                        messageQueue.ReceiveByLookupId(
+                            MessageLookupAction.Current, lookupId, transaction));
+
+                    return Convert(message);
+                }
+                catch(InvalidOperationException exc)
+                {
+                    // TODO: filter exceptions, catch only exceptions due to                    
+                    // unknown lookup id.
+                }
+            }
+        }
+
+        /// <summary>
+        /// Returns without removing (peeks) the first message in the queue
+        /// referenced by this MessageQueue, matching the selection criteria.
+        /// </summary>
+        /// <param name="maxTime">Reception time-out.</param>
+        /// <param name="convertBody">true if message body should be converted.</param>
+        /// <returns>Peeked message.</returns>
+        private IMessage InternalPeek(DateTime maxTime, bool convertBody)
+        {
+            TimeSpan timeSpan = maxTime - DateTime.Now;
+            if(timeSpan <= TimeSpan.Zero)
+            {
+                timeSpan = TimeSpan.Zero;
+            }
+
+            Cursor cursor = messageQueue.CreateCursor();
+
+            PeekAction action = PeekAction.Current;
+
+            while(true)
+            {
+                Message msmqMessage = null;
+
+                try
+                {
+                    msmqMessage = messageQueue.Peek(timeSpan, cursor, action);
+                }
+                catch(MessageQueueException exc)
+                {
+                    if(exc.MessageQueueErrorCode != MessageQueueErrorCode.IOTimeout)
+                    {
+                        throw exc;
+                    }
+                }
+
+                if(msmqMessage == null)
+                {
+                    return null;
+                }
+
+                IMessage nmsMessage = InternalMatch(msmqMessage, convertBody);
+
+                if(nmsMessage != null)
+                {
+                    return nmsMessage;
+                }
+
+                action = PeekAction.Next;
+            }
+        }
+
+        /// <summary>
+        /// Checks if an MSMQ message matches the selection criteria. If matched
+        /// the method returns the converted NMS message. Else it returns null.
+        /// </summary>
+        /// <param name="message">The MSMQ message to check.</param>
+        /// <param name="convertBody">true if the message body should be
+        /// converted.</param>
+        /// <returns>The matching message converted to NMS, or null.</returns>
+        private IMessage InternalMatch(Message message, bool convertBody)
+        {
+            if(messageConverterEx == null)
+            {
+                IMessage nmsMessage = messageConverter.ToNmsMessage(message);
+
+                evaluationContext.Message = nmsMessage;
+
+                if(selectionExpression.Matches(evaluationContext))
+                {
+                    return nmsMessage;
+                }
+            }
+            else
+            {
+                // This version converts the message body only for those
+                // messages matching the selection criteria.
+                // Relies on MessageConverterEx for partial conversions.
+                IMessage nmsMessage = messageConverterEx.ToNmsMessage(
+                    message, false);
+
+                evaluationContext.Message = nmsMessage;
+
+                if(selectionExpression.Matches(evaluationContext))
+                {
+                    if(convertBody)
+                    {
+                        messageConverterEx.ConvertMessageBodyToNMS(
+                            message, nmsMessage);
+                    }
+
+                    return nmsMessage;
+                }
+            }
+
+            return null;
+        }
+
+        /// <summary>
+        /// Checks if an MSMQ message matches the selection criteria.
+        /// </summary>
+        /// <param name="message">MSMQ message.</param>
+        /// <return>true if the message matches the selection criteria.</return>
+        public override bool Matches(Message message)
+        {
+            IMessage nmsMessage = messageConverterEx == null ?
+                messageConverter.ToNmsMessage(message) :
+                messageConverterEx.ToNmsMessage(message, false);
+
+            evaluationContext.Message = nmsMessage;
+
+            return selectionExpression.Matches(evaluationContext);
+        }
+	}
+}



Mime
View raw message