activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject [50/50] [abbrv] activemq-nms-msmq git commit: Apply patch for AMQNET-556 from Stephane Ramet. Thanks Stephane!
Date Tue, 07 Mar 2017 19:39:30 GMT
Apply patch for AMQNET-556 from Stephane Ramet.  Thanks Stephane!


Project: http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/commit/7274a80a
Tree: http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/tree/7274a80a
Diff: http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/diff/7274a80a

Branch: refs/heads/master
Commit: 7274a80a76cb827785d576f57f413b02f60ae83c
Parents: 16d8f06
Author: Jim Gomes <jgomes@apache.org>
Authored: Sat Sep 10 23:09:20 2016 +0000
Committer: Jim Gomes <jgomes@apache.org>
Committed: Sat Sep 10 23:09:20 2016 +0000

----------------------------------------------------------------------
 Apache.NMS.MSMQ.Test.nunit                      |   12 +-
 msmqprovider-test.config                        |   35 +
 nant.build                                      |    7 +-
 src/main/csharp/Connection.cs                   |  257 +-
 src/main/csharp/ConnectionMetaData.cs           |  214 +-
 src/main/csharp/DefaultMessageConverter.cs      |   31 +-
 src/main/csharp/IMessageConverterEx.cs          |   88 +-
 src/main/csharp/MessageConsumer.cs              |  191 +-
 src/main/csharp/MessageProducer.cs              |   73 +-
 src/main/csharp/Queue.cs                        |   73 +-
 src/main/csharp/QueueBrowser.cs                 |  336 +--
 .../csharp/Readers/AbstractMessageReader.cs     |  252 +-
 .../Readers/ByCorrelationIdMessageReader.cs     |  278 +--
 src/main/csharp/Readers/ByIdMessageReader.cs    |  272 +-
 .../csharp/Readers/ByLookupIdMessageReader.cs   |  360 +--
 .../csharp/Readers/BySelectorMessageReader.cs   |  576 +++--
 src/main/csharp/Readers/IMessageReader.cs       |  186 +-
 src/main/csharp/Readers/MessageReaderUtil.cs    |  182 +-
 .../csharp/Readers/NonFilteringMessageReader.cs |  256 +-
 src/main/csharp/Selector/ANDExpression.cs       |   93 +-
 .../csharp/Selector/AlignedNumericValues.cs     |  349 ++-
 .../csharp/Selector/ArithmeticExpression.cs     |  113 +-
 src/main/csharp/Selector/BinaryExpression.cs    |  117 +-
 .../csharp/Selector/BooleanCastExpression.cs    |   89 +-
 .../Selector/BooleanConstantExpression.cs       |   73 +-
 .../csharp/Selector/BooleanUnaryExpression.cs   |   77 +-
 .../csharp/Selector/ComparisonExpression.cs     |  323 ++-
 src/main/csharp/Selector/ConstantExpression.cs  |  311 ++-
 src/main/csharp/Selector/DivideExpression.cs    |  133 +-
 src/main/csharp/Selector/EqualExpression.cs     |   93 +-
 src/main/csharp/Selector/GreaterExpression.cs   |   83 +-
 .../csharp/Selector/GreaterOrEqualExpression.cs |   85 +-
 src/main/csharp/Selector/IBooleanExpression.cs  |   69 +-
 src/main/csharp/Selector/IExpression.cs         |   67 +-
 src/main/csharp/Selector/InExpression.cs        |  195 +-
 src/main/csharp/Selector/IsNullExpression.cs    |  117 +-
 src/main/csharp/Selector/LesserExpression.cs    |   83 +-
 .../csharp/Selector/LesserOrEqualExpression.cs  |   85 +-
 src/main/csharp/Selector/LikeExpression.cs      |  247 +-
 src/main/csharp/Selector/LogicExpression.cs     |   95 +-
 .../csharp/Selector/MessageEvaluationContext.cs |  153 +-
 src/main/csharp/Selector/MinusExpression.cs     |  133 +-
 src/main/csharp/Selector/ModExpression.cs       |  133 +-
 src/main/csharp/Selector/MultiplyExpression.cs  |  133 +-
 src/main/csharp/Selector/NOTExpression.cs       |   89 +-
 src/main/csharp/Selector/NegateExpression.cs    |  101 +-
 src/main/csharp/Selector/ORExpression.cs        |   91 +-
 src/main/csharp/Selector/ParseException.cs      |  394 +--
 src/main/csharp/Selector/PlusExpression.cs      |  135 +-
 src/main/csharp/Selector/PropertyExpression.cs  |  105 +-
 src/main/csharp/Selector/SelectorParser.cs      | 2341 +++++++++---------
 src/main/csharp/Selector/SelectorParser.csc     |   23 +-
 .../csharp/Selector/SelectorParserConstants.cs  |  150 +-
 .../Selector/SelectorParserTokenManager.cs      | 2053 ++++++++-------
 src/main/csharp/Selector/SimpleCharStream.cs    |  732 +++---
 src/main/csharp/Selector/Token.cs               |  156 +-
 src/main/csharp/Selector/TokenMgrError.cs       |  260 +-
 src/main/csharp/Selector/UnaryExpression.cs     |  131 +-
 src/main/csharp/Session.cs                      |   20 +-
 src/test/csharp/AsyncConsumeTest.cs             |  228 ++
 src/test/csharp/BadConsumeTest.cs               |   69 +
 src/test/csharp/BytesMessageTest.cs             |  138 ++
 src/test/csharp/Commands/BytesMessage.cs        |  511 ++++
 src/test/csharp/Commands/Destination.cs         |  380 +++
 src/test/csharp/Commands/MapMessage.cs          |   90 +
 src/test/csharp/Commands/Message.cs             |  329 +++
 src/test/csharp/Commands/ObjectMessage.cs       |   44 +
 src/test/csharp/Commands/Queue.cs               |   75 +
 src/test/csharp/Commands/StreamMessage.cs       |  901 +++++++
 src/test/csharp/Commands/TempDestination.cs     |   70 +
 src/test/csharp/Commands/TempQueue.cs           |   81 +
 src/test/csharp/Commands/TempTopic.cs           |   77 +
 src/test/csharp/Commands/TextMessage.cs         |   70 +
 src/test/csharp/Commands/Topic.cs               |   74 +
 src/test/csharp/ConnectionTest.cs               |  194 ++
 src/test/csharp/ConsumerTest.cs                 |  596 +++++
 src/test/csharp/DurableTest.cs                  |  267 ++
 src/test/csharp/EndianBinaryReaderTest.cs       |  162 ++
 src/test/csharp/EndianBinaryWriterTest.cs       |  202 ++
 src/test/csharp/EndianTest.cs                   |  131 +
 .../csharp/ForeignMessageTransformationTest.cs  |  315 +++
 src/test/csharp/MSMQAsyncConsumeTest.cs         |  107 +
 src/test/csharp/MSMQBadConsumeTest.cs           |   50 +
 src/test/csharp/MSMQBytesMessageTest.cs         |   51 +
 src/test/csharp/MSMQConnectionTest.cs           |  107 +
 src/test/csharp/MSMQConsumerTest.cs             |  154 ++
 src/test/csharp/MSMQDurableTest.cs              |   72 +
 .../MSMQForeignMessageTransformationTest.cs     |   86 +
 src/test/csharp/MSMQMapMessageTest.cs           |   49 +
 src/test/csharp/MSMQMessageSelectorTest.cs      |  169 ++
 src/test/csharp/MSMQMessageTest.cs              |   42 +
 src/test/csharp/MSMQMessageTransformerTest.cs   |   48 +
 src/test/csharp/MSMQNMSPropertyTest.cs          |   41 +
 src/test/csharp/MSMQProducerTest.cs             |   65 +
 src/test/csharp/MSMQRequestResponseTest.cs      |   42 +
 src/test/csharp/MSMQStreamMessageTest.cs        |   41 +
 .../csharp/MSMQTempDestinationDeletionTest.cs   |   48 +
 src/test/csharp/MSMQTempDestinationTest.cs      |   68 +
 src/test/csharp/MSMQTestSupport.cs              |   50 +
 src/test/csharp/MSMQTextMessageTest.cs          |   41 +
 src/test/csharp/MSMQTransactionTest.cs          |  115 +
 src/test/csharp/MSMQXmlMessageTest.cs           |   52 +
 src/test/csharp/MapMessageTest.cs               |  208 ++
 src/test/csharp/MessageSelectorTest.cs          |  221 ++
 src/test/csharp/MessageTest.cs                  |  147 ++
 src/test/csharp/MessageTransformerTest.cs       |  124 +
 src/test/csharp/NMSPropertyTest.cs              |   82 +
 src/test/csharp/NMSTest.cs                      |  505 ++++
 src/test/csharp/NMSTestSupport.cs               |  637 +++++
 src/test/csharp/NMSTracer.cs                    |   87 +
 src/test/csharp/PrimitiveMapTest.cs             |  170 ++
 src/test/csharp/ProducerTest.cs                 |  113 +
 src/test/csharp/RedeliveryPolicyTest.cs         |  135 +
 src/test/csharp/RequestResponseTest.cs          |   74 +
 src/test/csharp/StreamMessageTest.cs            |  111 +
 src/test/csharp/TempDestinationDeletionTest.cs  |   80 +
 src/test/csharp/TempDestinationTest.cs          |  175 ++
 src/test/csharp/TextMessageTest.cs              |   71 +
 src/test/csharp/TransactionTest.cs              |  439 ++++
 src/test/csharp/XmlMessageTest.cs               |  186 ++
 vs2008-msmq-test.csproj                         |   65 +-
 vs2008-msmq.csproj                              |    3 +-
 vs2008-msmq.sln                                 |   60 +-
 123 files changed, 17297 insertions(+), 6807 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/7274a80a/Apache.NMS.MSMQ.Test.nunit
----------------------------------------------------------------------
diff --git a/Apache.NMS.MSMQ.Test.nunit b/Apache.NMS.MSMQ.Test.nunit
index b321b9d..4dc9dc2 100644
--- a/Apache.NMS.MSMQ.Test.nunit
+++ b/Apache.NMS.MSMQ.Test.nunit
@@ -1,7 +1,7 @@
-<NUnitProject>
-  <Settings activeconfig="Default" />
-  <Config name="Default" binpathtype="Auto">
-    <assembly path="Apache.NMS.Test.dll" />
-    <assembly path="Apache.NMS.MSMQ.Test.dll" />
-  </Config>
+<NUnitProject>
+  <Settings activeconfig="Default" />
+  <Config name="Default" binpathtype="Auto">
+    <!--<assembly path="Apache.NMS.Test.dll" />-->
+    <assembly path="Apache.NMS.MSMQ.Test.dll" />
+  </Config>
 </NUnitProject>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/7274a80a/msmqprovider-test.config
----------------------------------------------------------------------
diff --git a/msmqprovider-test.config b/msmqprovider-test.config
new file mode 100644
index 0000000..65e5dff
--- /dev/null
+++ b/msmqprovider-test.config
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="utf-8" ?>
+<!--
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+-->
+<configuration>
+	<testURI value="msmq://localhost">
+		<userName value="guest"/>
+		<passWord value="guest"/>
+		<defaultTestQueue value="queue://.\Private$\TestQ"/>
+		<defaultTestTopic value=""/>
+		<defaultTestQueue2 value="queue://.\Private$\TestQ2"/>
+		<defaultTestTopic2 value=""/>
+		<durableConsumerTestTopic value=""/>
+		<messageSelectorTestQueue value="queue://.\Private$\TestQ"/>
+		<messageSelectorTestTopic value=""/>
+		<deletionTestQueue value="queue://.\Private$\TestQ"/>
+		<deletionTestTopic value=""/>
+		<deletionTestTempQueue value=""/>
+		<deletionTestTempTopic value=""/>
+		<transactionTestQueue value="queue://.\Private$\TestQT"/>
+	</testURI>
+</configuration>

http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/7274a80a/nant.build
----------------------------------------------------------------------
diff --git a/nant.build b/nant.build
index 0df9a29..67d0fa0 100644
--- a/nant.build
+++ b/nant.build
@@ -1,4 +1,4 @@
-<?xml version="1.0"?>
+<?xml version="1.0"?>
 <!--
     Licensed to the Apache Software Foundation (ASF) under one or more
     contributor license agreements.  See the NOTICE file distributed with
@@ -31,8 +31,6 @@
     <property name="nunit.dll" value="${basedir}/lib/NUnit/${current.build.framework}/nunit.framework.dll" dynamic="true" />
     <property name="Apache.NMS.dll" value="${basedir}/lib/Apache.NMS/${current.build.framework}/Apache.NMS.dll" dynamic="true" />
     <property name="Apache.NMS.pdb" value="${basedir}/lib/Apache.NMS/${current.build.framework}/Apache.NMS.pdb" dynamic="true" />
-    <property name="Apache.NMS.Test.dll" value="${basedir}/lib/Apache.NMS/${current.build.framework}//Apache.NMS.Test.dll" dynamic="true" />
-    <property name="Apache.NMS.Test.pdb" value="${basedir}/lib/Apache.NMS/${current.build.framework}/Apache.NMS.Test.pdb" dynamic="true" />
     <property name="NUnit.Projectfile" value="Apache.NMS.MSMQ.Test.nunit" />
 
     <!-- Skip certain frameworks, since MSMQ is not supported on those platforms. -->
@@ -77,7 +75,6 @@
             <include name="${current.build.framework.assembly.dir}/System.Xml.dll" />
             <include name="${current.build.framework.assembly.dir}/System.Messaging.dll" />
             <include name="${Apache.NMS.dll}" />
-            <include name="${Apache.NMS.Test.dll}" />
             <include name="${build.bin.dir}/${project.name}.dll" />
             <include name="${nunit.dll}" />
         </assemblyfileset>
@@ -88,8 +85,6 @@
             <include name="nmsprovider-*.config" />
             <include name="${Apache.NMS.dll}" />
             <include name="${Apache.NMS.pdb}" />
-            <include name="${Apache.NMS.Test.dll}" />
-            <include name="${Apache.NMS.Test.pdb}" />
             <include name="${nunit.dll}" />
             <include name="${NUnit.Projectfile}" />
         </fileset>

http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/7274a80a/src/main/csharp/Connection.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Connection.cs b/src/main/csharp/Connection.cs
index 9207708..5df73f4 100644
--- a/src/main/csharp/Connection.cs
+++ b/src/main/csharp/Connection.cs
@@ -16,6 +16,7 @@
  */
 
 using System;
+using System.Threading;
 
 namespace Apache.NMS.MSMQ
 {
@@ -26,21 +27,101 @@ namespace Apache.NMS.MSMQ
     ///
     public class Connection : IConnection
     {
-        private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
-        private IMessageConverter messageConverter = new DefaultMessageConverter();
+        #region Constructors
 
-        private IRedeliveryPolicy redeliveryPolicy;
-        private ConnectionMetaData metaData = null;
-        private bool connected;
-        private bool closed;
-        private string clientId;
+        public Connection()
+        {
+            // now lets send the connection and see if we get an ack/nak
+            // TODO: establish a connection
+        }
+
+        #endregion
+
+        #region Connection state
+
+        public enum ConnectionState
+        {
+            Created,
+            Connected,
+            Starting,
+            Started,
+            Stopping,
+            Stopped,
+            Closed
+        }
+
+        private ConnectionState state = ConnectionState.Created;
+
+        public class StateChangeEventArgs : EventArgs
+        {
+            public StateChangeEventArgs(ConnectionState originalState,
+                ConnectionState currentState)
+            {
+                this.originalState = originalState;
+                this.currentState = currentState;
+            }
+
+            private ConnectionState originalState;
+            public ConnectionState OriginalState
+            {
+                get { return originalState; }
+            }
+
+            private ConnectionState currentState;
+            public ConnectionState CurrentState
+            {
+                get { return currentState; }
+            }
+        }
+
+        public delegate void StateChangeEventHandler(object sender, StateChangeEventArgs e);
+
+        public event StateChangeEventHandler ConnectionStateChange;
+
+        private void ChangeState(ConnectionState newState)
+        {
+            if(ConnectionStateChange != null)
+            {
+                ConnectionStateChange(this, 
+                    new StateChangeEventArgs(this.state, newState));
+            }
+
+            this.state = newState;
+        }
+
+        private object stateLock = new object();
+
+        #endregion
+
+        #region Start & stop
 
         /// <summary>
         /// Starts message delivery for this connection.
         /// </summary>
         public void Start()
         {
-            CheckConnected();
+            lock(stateLock)
+            {
+                switch(state)
+                {
+                    case ConnectionState.Created:
+                    case ConnectionState.Connected:
+                    case ConnectionState.Stopped:
+                        ChangeState(ConnectionState.Starting);
+                        ChangeState(ConnectionState.Started);
+                        break;
+
+                    case ConnectionState.Stopping:
+                        throw new NMSException("Connection stopping");
+
+                    case ConnectionState.Closed:
+                        throw new NMSException("Connection closed");
+
+                    case ConnectionState.Starting:
+                    case ConnectionState.Started:
+                        break;
+                }
+            }
         }
 
         /// <summary>
@@ -49,7 +130,7 @@ namespace Apache.NMS.MSMQ
         /// </summary>
         public bool IsStarted
         {
-            get { return true; }
+            get { return state == ConnectionState.Started; }
         }
 
         /// <summary>
@@ -57,9 +138,65 @@ namespace Apache.NMS.MSMQ
         /// </summary>
         public void Stop()
         {
-            CheckConnected();
+            lock(stateLock)
+            {
+                switch(state)
+                {
+                    case ConnectionState.Started:
+                        ChangeState(ConnectionState.Stopping);
+                        ChangeState(ConnectionState.Stopped);
+                        break;
+
+                    case ConnectionState.Starting:
+                        throw new NMSException("Connection starting");
+
+                    case ConnectionState.Closed:
+                        throw new NMSException("Connection closed");
+
+                    case ConnectionState.Created:
+                    case ConnectionState.Connected:
+                    case ConnectionState.Stopping:
+                    case ConnectionState.Stopped:
+                        break;
+                }
+            }
+        }
+
+        #endregion
+
+        #region Close & dispose
+
+        public void Close()
+        {
+            if(!IsClosed)
+            {
+                Stop();
+
+                state = ConnectionState.Closed;
+            }
+        }
+
+        public bool IsClosed
+        {
+            get { return state == ConnectionState.Closed; }
+        }
+
+        public void Dispose()
+        {
+            try
+            {
+                Close();
+            }
+            catch
+            {
+                state = ConnectionState.Closed;
+            }
         }
 
+        #endregion
+
+        #region Create session
+
         /// <summary>
         /// Creates a new session to work on this connection
         /// </summary>
@@ -73,14 +210,16 @@ namespace Apache.NMS.MSMQ
         /// </summary>
         public ISession CreateSession(AcknowledgementMode mode)
         {
-            CheckConnected();
+            if(IsClosed)
+            {
+                throw new NMSException("Connection closed");
+            }
             return new Session(this, mode);
         }
 
-        public void Dispose()
-        {
-            closed = true;
-        }
+        #endregion
+
+        #region Connection properties
 
         /// <summary>
         /// The default timeout for network requests.
@@ -91,24 +230,27 @@ namespace Apache.NMS.MSMQ
             set { }
         }
 
+        private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge;
         public AcknowledgementMode AcknowledgementMode
         {
             get { return acknowledgementMode; }
             set { acknowledgementMode = value; }
         }
 
+        private IMessageConverter messageConverter = new DefaultMessageConverter();
         public IMessageConverter MessageConverter
         {
             get { return messageConverter; }
             set { messageConverter = value; }
         }
 
+        private string clientId;
         public string ClientId
         {
             get { return clientId; }
             set
             {
-                if(connected)
+                if(state != ConnectionState.Created)
                 {
                     throw new NMSException("You cannot change the ClientId once the Connection is connected");
                 }
@@ -116,6 +258,7 @@ namespace Apache.NMS.MSMQ
             }
         }
 
+        private IRedeliveryPolicy redeliveryPolicy;
         /// <summary>
         /// Get/or set the redelivery policy for this connection.
         /// </summary>
@@ -125,6 +268,19 @@ namespace Apache.NMS.MSMQ
             set { this.redeliveryPolicy = value; }
         }
 
+        private ConnectionMetaData metaData = null;
+        /// <summary>
+        /// Gets the Meta Data for the NMS Connection instance.
+        /// </summary>
+        public IConnectionMetaData MetaData
+        {
+            get { return this.metaData ?? (this.metaData = new ConnectionMetaData()); }
+        }
+
+        #endregion
+
+        #region Transformer delegates
+
         private ConsumerTransformerDelegate consumerTransformer;
         public ConsumerTransformerDelegate ConsumerTransformer
         {
@@ -139,57 +295,18 @@ namespace Apache.NMS.MSMQ
             set { this.producerTransformer = value; }
         }
 
-        /// <summary>
-        /// Gets the Meta Data for the NMS Connection instance.
-        /// </summary>
-        public IConnectionMetaData MetaData
-        {
-            get { return this.metaData ?? (this.metaData = new ConnectionMetaData()); }
-        }
+        #endregion
+
+        #region Exception & transport listeners
 
         /// <summary>
         /// A delegate that can receive transport level exceptions.
         /// </summary>
         public event ExceptionListener ExceptionListener;
 
-        /// <summary>
-        /// An asynchronous listener that is notified when a Fault tolerant connection
-        /// has been interrupted.
-        /// </summary>
-        public event ConnectionInterruptedListener ConnectionInterruptedListener;
-
-        /// <summary>
-        /// An asynchronous listener that is notified when a Fault tolerant connection
-        /// has been resumed.
-        /// </summary>
-        public event ConnectionResumedListener ConnectionResumedListener;
-
-        protected void CheckConnected()
-        {
-            if(closed)
-            {
-                throw new NMSException("Connection Closed");
-            }
-            if(!connected)
-            {
-                connected = true;
-                // now lets send the connection and see if we get an ack/nak
-                // TODO: establish a connection
-            }
-        }
-
-        public void Close()
-        {
-            Dispose();
-        }
-
-        public void PurgeTempDestinations()
-        {
-        }
-
         public void HandleException(Exception e)
         {
-            if(ExceptionListener != null && !this.closed)
+            if(ExceptionListener != null && !this.IsClosed)
             {
                 ExceptionListener(e);
             }
@@ -199,11 +316,17 @@ namespace Apache.NMS.MSMQ
             }
         }
 
+        /// <summary>
+        /// An asynchronous listener that is notified when a Fault tolerant connection
+        /// has been interrupted.
+        /// </summary>
+        public event ConnectionInterruptedListener ConnectionInterruptedListener;
+
         public void HandleTransportInterrupted()
         {
             Tracer.Debug("Transport has been Interrupted.");
 
-            if(this.ConnectionInterruptedListener != null && !this.closed)
+            if(this.ConnectionInterruptedListener != null && !this.IsClosed)
             {
                 try
                 {
@@ -215,11 +338,17 @@ namespace Apache.NMS.MSMQ
             }
         }
 
+        /// <summary>
+        /// An asynchronous listener that is notified when a Fault tolerant connection
+        /// has been resumed.
+        /// </summary>
+        public event ConnectionResumedListener ConnectionResumedListener;
+
         public void HandleTransportResumed()
         {
             Tracer.Debug("Transport has resumed normal operation.");
 
-            if(this.ConnectionResumedListener != null && !this.closed)
+            if(this.ConnectionResumedListener != null && !this.IsClosed)
             {
                 try
                 {
@@ -230,5 +359,11 @@ namespace Apache.NMS.MSMQ
                 }
             }
         }
+
+        #endregion
+
+        public void PurgeTempDestinations()
+        {
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/7274a80a/src/main/csharp/ConnectionMetaData.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/ConnectionMetaData.cs b/src/main/csharp/ConnectionMetaData.cs
index 5a305c0..9bba3b6 100644
--- a/src/main/csharp/ConnectionMetaData.cs
+++ b/src/main/csharp/ConnectionMetaData.cs
@@ -1,107 +1,107 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-using System;
-using System.Reflection;
-
-namespace Apache.NMS.MSMQ
-{
-	/// <summary>
-	/// Implements the Connection Meta-Data feature for Apache.NMS.MSMQ
-	/// </summary>
-	public class ConnectionMetaData : IConnectionMetaData
-	{
-		private int nmsMajorVersion;
-		private int nmsMinorVersion;
-
-		private string nmsProviderName;
-		private string nmsVersion;
-
-		private int providerMajorVersion;
-		private int providerMinorVersion;
-		private string providerVersion;
-
-		private string[] nmsxProperties;
-
-		public ConnectionMetaData()
-		{
-			Assembly self = Assembly.GetExecutingAssembly();
-			AssemblyName asmName = self.GetName();
-
-			this.nmsProviderName = asmName.Name;
-			this.providerMajorVersion = asmName.Version.Major;
-			this.providerMinorVersion = asmName.Version.Minor;
-			this.providerVersion = asmName.Version.ToString();
-
-			this.nmsxProperties = new String[] { };
-
-			foreach(AssemblyName name in self.GetReferencedAssemblies())
-			{
-				if(0 == string.Compare(name.Name, "Apache.NMS", true))
-				{
-					this.nmsMajorVersion = name.Version.Major;
-					this.nmsMinorVersion = name.Version.Minor;
-					this.nmsVersion = name.Version.ToString();
-
-					return;
-				}
-			}
-
-			throw new NMSException("Could not find a reference to the Apache.NMS Assembly.");
-		}
-
-		public int NMSMajorVersion
-		{
-			get { return this.nmsMajorVersion; }
-		}
-
-		public int NMSMinorVersion
-		{
-			get { return this.nmsMinorVersion; }
-		}
-
-		public string NMSProviderName
-		{
-			get { return this.nmsProviderName; }
-		}
-
-		public string NMSVersion
-		{
-			get { return this.nmsVersion; }
-		}
-
-		public string[] NMSXPropertyNames
-		{
-			get { return this.nmsxProperties; }
-		}
-
-		public int ProviderMajorVersion
-		{
-			get { return this.providerMajorVersion; }
-		}
-
-		public int ProviderMinorVersion
-		{
-			get { return this.providerMinorVersion; }
-		}
-
-		public string ProviderVersion
-		{
-			get { return this.providerVersion; }
-		}
-	}
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Reflection;
+
+namespace Apache.NMS.MSMQ
+{
+	/// <summary>
+	/// Implements the Connection Meta-Data feature for Apache.NMS.MSMQ
+	/// </summary>
+	public class ConnectionMetaData : IConnectionMetaData
+	{
+		private int nmsMajorVersion;
+		private int nmsMinorVersion;
+
+		private string nmsProviderName;
+		private string nmsVersion;
+
+		private int providerMajorVersion;
+		private int providerMinorVersion;
+		private string providerVersion;
+
+		private string[] nmsxProperties;
+
+		public ConnectionMetaData()
+		{
+			Assembly self = Assembly.GetExecutingAssembly();
+			AssemblyName asmName = self.GetName();
+
+			this.nmsProviderName = asmName.Name;
+			this.providerMajorVersion = asmName.Version.Major;
+			this.providerMinorVersion = asmName.Version.Minor;
+			this.providerVersion = asmName.Version.ToString();
+
+			this.nmsxProperties = new String[] { };
+
+			foreach(AssemblyName name in self.GetReferencedAssemblies())
+			{
+				if(0 == string.Compare(name.Name, "Apache.NMS", true))
+				{
+					this.nmsMajorVersion = name.Version.Major;
+					this.nmsMinorVersion = name.Version.Minor;
+					this.nmsVersion = name.Version.ToString();
+
+					return;
+				}
+			}
+
+			throw new NMSException("Could not find a reference to the Apache.NMS Assembly.");
+		}
+
+		public int NMSMajorVersion
+		{
+			get { return this.nmsMajorVersion; }
+		}
+
+		public int NMSMinorVersion
+		{
+			get { return this.nmsMinorVersion; }
+		}
+
+		public string NMSProviderName
+		{
+			get { return this.nmsProviderName; }
+		}
+
+		public string NMSVersion
+		{
+			get { return this.nmsVersion; }
+		}
+
+		public string[] NMSXPropertyNames
+		{
+			get { return this.nmsxProperties; }
+		}
+
+		public int ProviderMajorVersion
+		{
+			get { return this.providerMajorVersion; }
+		}
+
+		public int ProviderMinorVersion
+		{
+			get { return this.providerMinorVersion; }
+		}
+
+		public string ProviderVersion
+		{
+			get { return this.providerVersion; }
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/7274a80a/src/main/csharp/DefaultMessageConverter.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/DefaultMessageConverter.cs b/src/main/csharp/DefaultMessageConverter.cs
index 83097fc..b726474 100644
--- a/src/main/csharp/DefaultMessageConverter.cs
+++ b/src/main/csharp/DefaultMessageConverter.cs
@@ -118,16 +118,16 @@ namespace Apache.NMS.MSMQ
             // Populate property data
             foreach(object keyObject in message.Properties.Keys)
             {
-              string key = (keyObject as string);
-              object val = message.Properties.GetString(key);
-              if(!SetLabelAsNMSType && string.Compare(key, "Label", true) == 0 && val != null)
-              {
-				msmqMessage.Label = val.ToString();
-              }
-              else
-              {
-				propertyData[key] = val;
-              }
+                string key = (keyObject as string);
+                object val = message.Properties[key];
+                if(!SetLabelAsNMSType && string.Compare(key, "Label", true) == 0 && val != null)
+                {
+				    msmqMessage.Label = val.ToString();
+                }
+                else
+                {
+				    propertyData[key] = val;
+                }
             }
 
 			// Store the NMS property data in the extension area
@@ -418,6 +418,8 @@ namespace Apache.NMS.MSMQ
 				result = baseMessage;
 			}
 
+            result.ReadOnlyBody = true;
+
 			return result;
 		}
 
@@ -442,6 +444,7 @@ namespace Apache.NMS.MSMQ
 			else if(message is BytesMessage)
 			{
 				BytesMessage bytesMessage = message as BytesMessage;
+                bytesMessage.Reset();
 				answer.BodyStream.Write(bytesMessage.Content, 0, bytesMessage.Content.Length);
 				answer.AppSpecific = (int) NMSMessageType.BytesMessage;
 			}
@@ -604,12 +607,14 @@ namespace Apache.NMS.MSMQ
         /// <result>MSMQ queue.</result>
 		public MessageQueue ToMsmqDestination(IDestination destination)
 		{
-			if(null == destination)
+            Queue queue = destination as Queue;
+
+			if(destination == null)
 			{
 				return null;
 			}
 
-			return new MessageQueue((destination as Destination).Path);
+			return queue.MSMQMessageQueue;
 		}
 
         /// <summary>
@@ -625,7 +630,7 @@ namespace Apache.NMS.MSMQ
 				return null;
 			}
 
-			return new Queue(destinationQueue.Path);
+			return new Queue(destinationQueue);
 		}
 
         #endregion

http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/7274a80a/src/main/csharp/IMessageConverterEx.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/IMessageConverterEx.cs b/src/main/csharp/IMessageConverterEx.cs
index 92be928..fcd1354 100644
--- a/src/main/csharp/IMessageConverterEx.cs
+++ b/src/main/csharp/IMessageConverterEx.cs
@@ -1,44 +1,44 @@
-using System.Messaging;
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace Apache.NMS.MSMQ
-{
-    /// <summary>
-    /// Extended IMessageConverter interface supporting new methods for
-    /// optimizing message selection through "selectors".
-    /// The original IMessageConverter is maintained for compatibility
-    /// reasons with existing clients implementing it.
-    /// </summary>
-	public interface IMessageConverterEx : IMessageConverter
-	{
-        /// <summary>
-        /// Converts the specified MSMQ message to an equivalent NMS message.
-        /// </summary>
-        /// <param name="message">MSMQ message to be converted.</param>
-        /// <param name="convertBody">true if message body should be converted.</param>
-        /// <result>Converted NMS message.</result>
-		IMessage ToNmsMessage(Message message, bool convertBody);
-
-        /// <summary>
-        /// Converts an MSMQ message body to the equivalent NMS message body.
-        /// </summary>
-        /// <param name="message">Source MSMQ message.</param>
-        /// <param name="answer">Target NMS message.</param>
-		void ConvertMessageBodyToNMS(Message message, IMessage answer);
-	}
-}
+using System.Messaging;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.NMS.MSMQ
+{
+    /// <summary>
+    /// Extended IMessageConverter interface supporting new methods for
+    /// optimizing message selection through "selectors".
+    /// The original IMessageConverter is maintained for compatibility
+    /// reasons with existing clients implementing it.
+    /// </summary>
+	public interface IMessageConverterEx : IMessageConverter
+	{
+        /// <summary>
+        /// Converts the specified MSMQ message to an equivalent NMS message.
+        /// </summary>
+        /// <param name="message">MSMQ message to be converted.</param>
+        /// <param name="convertBody">true if message body should be converted.</param>
+        /// <result>Converted NMS message.</result>
+		IMessage ToNmsMessage(Message message, bool convertBody);
+
+        /// <summary>
+        /// Converts an MSMQ message body to the equivalent NMS message body.
+        /// </summary>
+        /// <param name="message">Source MSMQ message.</param>
+        /// <param name="answer">Target NMS message.</param>
+		void ConvertMessageBodyToNMS(Message message, IMessage answer);
+	}
+}

http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/7274a80a/src/main/csharp/MessageConsumer.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/MessageConsumer.cs b/src/main/csharp/MessageConsumer.cs
index eaaed5c..c69c6f5 100644
--- a/src/main/csharp/MessageConsumer.cs
+++ b/src/main/csharp/MessageConsumer.cs
@@ -32,9 +32,6 @@ namespace Apache.NMS.MSMQ
         private readonly Session session;
         private readonly AcknowledgementMode acknowledgementMode;
         private MessageQueue messageQueue;
-        private Thread asyncDeliveryThread = null;
-        private AutoResetEvent pause = new AutoResetEvent(false);
-        private Atomic<bool> asyncDelivery = new Atomic<bool>(false);
 
         private ConsumerTransformerDelegate consumerTransformer;
         public ConsumerTransformerDelegate ConsumerTransformer
@@ -81,6 +78,8 @@ namespace Apache.NMS.MSMQ
                 messageQueue, session.MessageConverter, selector);
         }
 
+        #region Asynchronous delivery
+
         private int listenerCount = 0;
         private event MessageListener listener;
         public event MessageListener Listener
@@ -89,7 +88,13 @@ namespace Apache.NMS.MSMQ
             {
                 listener += value;
                 listenerCount++;
-                StartAsyncDelivery();
+
+                session.Connection.ConnectionStateChange += OnConnectionStateChange;
+
+                if(session.Connection.IsStarted)
+                {
+                    StartAsyncDelivery();
+                }
             }
 
             remove
@@ -100,87 +105,37 @@ namespace Apache.NMS.MSMQ
                     listenerCount--;
                 }
 
-                if(0 == listenerCount)
+                if(listenerCount == 0)
                 {
+                    session.Connection.ConnectionStateChange -= OnConnectionStateChange;
+
                     StopAsyncDelivery();
                 }
             }
         }
 
-        public IMessage Receive()
-        {
-            IMessage nmsMessage = null;
-
-            if(messageQueue != null)
-            {
-                nmsMessage = reader.Receive();
-                nmsMessage = TransformMessage(nmsMessage);
-            }
-
-            return nmsMessage;
-        }
-
-        public IMessage Receive(TimeSpan timeout)
-        {
-            IMessage nmsMessage = null;
-
-            if(messageQueue != null)
-            {
-                nmsMessage = reader.Receive(timeout);
-                nmsMessage = TransformMessage(nmsMessage);
-            }
-
-            return nmsMessage;
-        }
-
-        public IMessage ReceiveNoWait()
-        {
-            IMessage nmsMessage = null;
-
-            if(messageQueue != null)
-            {
-                nmsMessage = reader.Receive(zeroTimeout);
-                nmsMessage = TransformMessage(nmsMessage);
-            }
-
-            return nmsMessage;
-        }
-
-        public void Dispose()
+        private void OnConnectionStateChange(object sender, Connection.StateChangeEventArgs e)
         {
-            Close();
-        }
-
-        public void Close()
-        {
-            StopAsyncDelivery();
-            if(messageQueue != null)
+            if(e.CurrentState == Connection.ConnectionState.Starting)
             {
-                messageQueue.Dispose();
-                messageQueue = null;
+                if(listenerCount > 0)
+                {
+                    StartAsyncDelivery();
+                }
             }
-        }
-
-        protected virtual void StopAsyncDelivery()
-        {
-            if(asyncDelivery.CompareAndSet(true, false))
+            else if(e.CurrentState == Connection.ConnectionState.Stopping)
             {
-                if(null != asyncDeliveryThread)
+                if(listenerCount > 0)
                 {
-                    Tracer.Info("Stopping async delivery thread.");
-                    pause.Set();
-                    if(!asyncDeliveryThread.Join(10000))
-                    {
-                        Tracer.Info("Aborting async delivery thread.");
-                        asyncDeliveryThread.Abort();
-                    }
-
-                    asyncDeliveryThread = null;
-                    Tracer.Info("Async delivery thread stopped.");
+                    StopAsyncDelivery();
                 }
             }
         }
 
+        private Thread asyncDeliveryThread = null;
+        private Atomic<bool> asyncDelivery = new Atomic<bool>(false);
+        TimeSpan dispatchingTimeout = new TimeSpan(5000);
+
         protected virtual void StartAsyncDelivery()
         {
             if(asyncDelivery.CompareAndSet(false, true))
@@ -199,7 +154,7 @@ namespace Apache.NMS.MSMQ
             {
                 try
                 {
-                    IMessage message = Receive();
+                    IMessage message = Receive(dispatchingTimeout);
                     if(asyncDelivery.Value && message != null)
                     {
                         try
@@ -215,6 +170,7 @@ namespace Apache.NMS.MSMQ
                 catch(ThreadAbortException ex)
                 {
                     Tracer.InfoFormat("Thread abort received in thread: {0} : {1}", this, ex.Message);
+
                     break;
                 }
                 catch(Exception ex)
@@ -225,11 +181,104 @@ namespace Apache.NMS.MSMQ
             Tracer.Info("Stopping dispatcher thread consumer: " + this);
         }
 
+        protected virtual void StopAsyncDelivery()
+        {
+            if(asyncDelivery.CompareAndSet(true, false))
+            {
+                if(null != asyncDeliveryThread)
+                {
+                    // Thread.Interrupt and Thread.Abort do not interrupt Receive
+                    // instructions. Attempting to abort the thread and joining
+                    // will result in a phantom backgroud thread, which may
+                    // ultimately consume a message before actually stopping.
+
+                    Tracer.Info("Waiting for thread to complete aborting.");
+                    asyncDeliveryThread.Join(dispatchingTimeout);
+
+                    asyncDeliveryThread = null;
+                    Tracer.Info("Async delivery thread stopped.");
+                }
+            }
+        }
+
+
         protected virtual void HandleAsyncException(Exception e)
         {
             session.Connection.HandleException(e);
         }
 
+        #endregion
+
+        #region Receive (synchronous)
+
+        public IMessage Receive()
+        {
+            IMessage nmsMessage = null;
+
+            if(messageQueue != null)
+            {
+                nmsMessage = reader.Receive();
+                nmsMessage = TransformMessage(nmsMessage);
+            }
+
+            return nmsMessage;
+        }
+
+        public IMessage Receive(TimeSpan timeout)
+        {
+            IMessage nmsMessage = null;
+
+            if(messageQueue != null)
+            {
+                try
+                {
+                    nmsMessage = reader.Receive(timeout);
+                }
+                catch(MessageQueueException ex)
+                {
+                    if(ex.MessageQueueErrorCode != MessageQueueErrorCode.IOTimeout)
+                    {
+                        throw ex;
+                    }
+                }
+                nmsMessage = TransformMessage(nmsMessage);
+            }
+
+            return nmsMessage;
+        }
+
+        public IMessage ReceiveNoWait()
+        {
+            return Receive(zeroTimeout);
+        }
+
+        #endregion
+
+        #region Close & dispose
+
+        public void Dispose()
+        {
+            Close();
+        }
+
+        public void Close()
+        {
+            if(listenerCount > 0)
+            {
+                session.Connection.ConnectionStateChange -= OnConnectionStateChange;
+
+                StopAsyncDelivery();
+            }
+
+            if(messageQueue != null)
+            {
+                messageQueue.Dispose();
+                messageQueue = null;
+            }
+        }
+
+        #endregion
+
         protected virtual IMessage TransformMessage(IMessage message)
         {
             IMessage transformed = message;

http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/7274a80a/src/main/csharp/MessageProducer.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/MessageProducer.cs b/src/main/csharp/MessageProducer.cs
index 8690fc4..4a0bed4 100644
--- a/src/main/csharp/MessageProducer.cs
+++ b/src/main/csharp/MessageProducer.cs
@@ -50,41 +50,47 @@ namespace Apache.NMS.MSMQ
             this.destination = destination;
             if(destination != null)
             {
-                messageQueue = openMessageQueue(destination);
+                messageQueue = OpenMessageQueue(destination);
             }
         }
 
-        private MessageQueue openMessageQueue(Destination dest)
+        private MessageQueue OpenMessageQueue(Destination dest)
         {
-            MessageQueue rc = null;
-            try
+            Queue queue = dest as Queue;
+
+            MessageQueue mq = queue.MSMQMessageQueue;
+
+            if(mq == null)
             {
-                if(!MessageQueue.Exists(dest.Path))
-                {
-                    // create the new message queue and make it transactional
-                    rc = MessageQueue.Create(dest.Path, session.Transacted);
-                    this.destination.Path = rc.Path;
-                }
-                else
+                try
                 {
-                    rc = new MessageQueue(dest.Path);
-                    this.destination.Path = rc.Path;
-                    if(!rc.CanWrite)
+                    if(!Queue.Exists(dest.Path))
                     {
-                        throw new NMSSecurityException("Do not have write access to: " + dest);
+                        // create the new message queue and make it transactional
+                        mq = MessageQueue.Create(dest.Path, session.Transacted);
+                        this.destination = new Queue(mq);
+                    }
+                    else
+                    {
+                        mq = new MessageQueue(dest.Path);
+                        this.destination = new Queue(mq);
+                        if(!mq.CanWrite)
+                        {
+                            throw new NMSSecurityException("Do not have write access to: " + dest);
+                        }
                     }
                 }
-            }
-            catch(Exception e)
-            {
-                if(rc != null)
+                catch(Exception e)
                 {
-                    rc.Dispose();
+                    if(mq != null)
+                    {
+                        mq.Dispose();
+                    }
+                
+                    throw new NMSException(e.Message + ": " + dest, e);
                 }
-
-                throw new NMSException(e.Message + ": " + dest, e);
             }
-            return rc;
+            return mq;
         }
 
         public void Send(IMessage message)
@@ -111,18 +117,28 @@ namespace Apache.NMS.MSMQ
                 // Locate the MSMQ Queue we will be sending to
                 if(messageQueue != null)
                 {
+                    if(destination == null)
+                    {
+                        throw new InvalidDestinationException("This producer can only be used to send to: " + destination);
+                    }
+
                     if(destination.Equals(this.destination))
                     {
                         mq = messageQueue;
                     }
                     else
                     {
-                        throw new NMSException("This producer can only be used to send to: " + destination);
+                        throw new NotSupportedException("This producer can only be used to send to: " + destination);
                     }
                 }
                 else
                 {
-                    mq = openMessageQueue((Destination) destination);
+                    if(destination == null)
+                    {
+                        throw new NotSupportedException();
+                    }
+
+                    mq = OpenMessageQueue((Destination) destination);
                 }
 
                 if(this.ProducerTransformer != null)
@@ -147,7 +163,7 @@ namespace Apache.NMS.MSMQ
                     // TODO: message.NMSMessageId =
                 }
 
-                // Convert the Mesasge into a MSMQ message
+                // Convert the Message into a MSMQ message
                 Message msg = session.MessageConverter.ToMsmqMessage(message);
 
                 if(mq.Transactional)
@@ -179,6 +195,11 @@ namespace Apache.NMS.MSMQ
                     mq.Send(msg);
                 }
 
+                message.NMSMessageId = msg.Id;
+                if(message.NMSCorrelationID == null)
+                {
+                    message.NMSCorrelationID = msg.CorrelationId;
+                }
             }
             finally
             {

http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/7274a80a/src/main/csharp/Queue.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Queue.cs b/src/main/csharp/Queue.cs
index 30efcd9..4444a1b 100644
--- a/src/main/csharp/Queue.cs
+++ b/src/main/csharp/Queue.cs
@@ -15,6 +15,7 @@
  * limitations under the License.
  */
 using System;
+using System.Messaging;
 
 namespace Apache.NMS.MSMQ
 {
@@ -24,17 +25,69 @@ namespace Apache.NMS.MSMQ
 	/// </summary>
 	public class Queue : Destination, IQueue
 	{
-
 		public Queue()
 			: base()
 		{
 		}
 
+		public Queue(MessageQueue messageQueue)
+			: base()
+		{
+			this.messageQueue = messageQueue;
+            Path = messageQueue.Path;
+		}
+
 		public Queue(String name)
 			: base(name)
 		{
+			if(string.IsNullOrEmpty(name))
+			{
+				messageQueue = null;
+			}
+            else
+            {
+                try
+                {
+                    messageQueue = new MessageQueue(name);
+                }
+                catch(Exception /*ex*/)
+                {
+                    // Excerpt from Microsoft documentation for MessageQueue.Exists :
+                    // (@https://msdn.microsoft.com/fr-fr/library/system.messaging.messagequeue.exists(v=vs.110).aspx)
+                    // Exists(String) is an expensive operation. Use it only when it is necessary within the application.
+                    // ---
+                    // Therefore, we won't check for existence of the queue before attempting to access it.
+
+                    //if(!Exists(name))
+                    //{
+                        // Excerpt from the Oracle JMS JavaDoc for Session.createQueue :
+                        // (@https://docs.oracle.com/javaee/7/api/javax/jms/Session.html#createQueue-java.lang.String-)
+                        // Note that this method simply creates an object that encapsulates the name of a queue. It does
+                        // not create the physical queue in the JMS provider. JMS does not provide a method to create the
+                        // physical queue, since this would be specific to a given JMS provider. Creating a physical queue
+                        // is provider-specific and is typically an administrative task performed by an administrator,
+                        // though some providers may create them automatically when needed. The one exception to this is
+                        // the creation of a temporary queue, which is done using the createTemporaryQueue method.
+                        // ---
+                        // Therefore, we should throw an NMSException if the queue does not exist.
+                        // ---
+                        // BUT, to keep it compatible with the initial implementation of MessageProducer, which attempts
+                        // to create non pre-existing queues, we keep it silent..
+
+                        // throw new NMSException("Message queue \"" + name + "\" does not exist", ex);
+                    //}
+
+                    // throw new NMSException("Cannot access message queue \"" + name + "\"", ex);
+                }
+            }
 		}
 
+        private MessageQueue messageQueue;
+        public MessageQueue MSMQMessageQueue
+        {
+            get { return messageQueue; }
+        }
+
 		override public DestinationType DestinationType
 		{
 			get
@@ -54,7 +107,25 @@ namespace Apache.NMS.MSMQ
 			return new Queue(name);
 		}
 
+        public static bool Exists(string name)
+        {
+            try
+            {
+                return MessageQueue.Exists(name);
+            }
+            catch(InvalidOperationException)
+            {
+                // Excerpt from Microsoft documentation for MessageQueue.Exists :
+                // (@https://msdn.microsoft.com/fr-fr/library/system.messaging.messagequeue.exists(v=vs.110).aspx)
+                // InvalidOperationException: The application used format name syntax when verifying queue existence. 
+                // ---
+                // The Exists(String) method does not support the FormatName prefix.
+                // No method exists to determine whether a queue with a specified format name exists.
 
+                // We'll assume the queue exists at this point
+                return true;
+            }
+        }
 	}
 }
 

http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/7274a80a/src/main/csharp/QueueBrowser.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/QueueBrowser.cs b/src/main/csharp/QueueBrowser.cs
index 3ff795d..9ccc2f9 100644
--- a/src/main/csharp/QueueBrowser.cs
+++ b/src/main/csharp/QueueBrowser.cs
@@ -1,164 +1,172 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections;
-using System.Messaging;
-using Apache.NMS;
-using Apache.NMS.Util;
-using Apache.NMS.MSMQ.Readers;
-
-namespace Apache.NMS.MSMQ
-{
-	public class QueueBrowser : Apache.NMS.IQueueBrowser
-	{
-		private bool closed = false;
-		private bool disposed = false;
-
-        private readonly Session session;
-        private MessageQueue messageQueue;
-
-        private string selector;
-
-        private IMessageReader reader;
-        
-		public QueueBrowser(Session session, MessageQueue messageQueue)
-            : this(session, messageQueue, null)
-		{
-		}
-
-		public QueueBrowser(Session session, MessageQueue messageQueue,
-            string selector)
-		{
-            this.session = session;
-            this.messageQueue = messageQueue;
-            if(null != this.messageQueue)
-            {
-                this.messageQueue.MessageReadPropertyFilter.SetAll();
-            }
-
-            reader = MessageReaderUtil.CreateMessageReader(
-                messageQueue, session.MessageConverter, selector);
-		}
-
-		~QueueBrowser()
-		{
-			Dispose(false);
-		}
-
-		#region IDisposable Members
-
-		///<summary>
-		/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
-		///</summary>
-		public void Dispose()
-		{
-			Dispose(true);
-			GC.SuppressFinalize(this);
-		}
-
-		protected void Dispose(bool disposing)
-		{
-			if(disposed)
-			{
-				return;
-			}
-
-			if(disposing)
-			{
-				// Dispose managed code here.
-			}
-
-			try
-			{
-				Close();
-			}
-			catch
-			{
-				// Ignore errors.
-			}
-
-			disposed = true;
-		}
-
-		#endregion
-
-		public void  Close()
-		{
-            if(messageQueue != null)
-            {
-                messageQueue.Dispose();
-                messageQueue = null;
-            }
-			closed = true;
-		}
-
-		public string MessageSelector
-		{
-			get { return selector; }
-		}
-
-		public IQueue Queue
-		{
-			get { return new Queue(this.messageQueue.Path); }
-		}
-
-		internal class Enumerator : IEnumerator
-		{
-			private readonly Session session;
-			private readonly MessageEnumerator innerEnumerator;
-            private readonly IMessageReader reader;
-
-			public Enumerator(Session session, MessageQueue messageQueue,
-                IMessageReader reader)
-			{
-				this.session = session;
-				this.innerEnumerator = messageQueue.GetMessageEnumerator2();
-                this.reader = reader;
-			}
-
-			public object Current
-			{
-				get
-				{
-					return this.session.MessageConverter.ToNmsMessage(this.innerEnumerator.Current);
-				}
-			}
-
-			public bool MoveNext()
-			{
-                while(this.innerEnumerator.MoveNext())
-                {
-				    if(reader.Matches(this.innerEnumerator.Current))
-                    {
-                        return true;
-                    }
-                }
-                return false;
-			}
-
-			public void Reset()
-			{
-				this.innerEnumerator.Reset();
-			}
-		}
-
-		public IEnumerator GetEnumerator()
-		{
-			return new Enumerator(this.session, this.messageQueue, this.reader);
-		}
-	}
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections;
+using System.Messaging;
+using Apache.NMS;
+using Apache.NMS.Util;
+using Apache.NMS.MSMQ.Readers;
+
+namespace Apache.NMS.MSMQ
+{
+	public class QueueBrowser : Apache.NMS.IQueueBrowser
+	{
+		private bool disposed = false;
+
+        private readonly Session session;
+        private MessageQueue messageQueue;
+
+        private string selector;
+
+        private IMessageReader reader;
+        
+		public QueueBrowser(Session session, MessageQueue messageQueue)
+            : this(session, messageQueue, null)
+		{
+		}
+
+		public QueueBrowser(Session session, MessageQueue messageQueue,
+            string selector)
+		{
+            this.session = session;
+            this.messageQueue = messageQueue;
+            if(null != this.messageQueue)
+            {
+                this.messageQueue.MessageReadPropertyFilter.SetAll();
+            }
+            this.selector = selector;
+
+            reader = MessageReaderUtil.CreateMessageReader(
+                messageQueue, session.MessageConverter, selector);
+		}
+
+		~QueueBrowser()
+		{
+			Dispose(false);
+		}
+
+		#region IDisposable Members
+
+		///<summary>
+		/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
+		///</summary>
+		public void Dispose()
+		{
+			Dispose(true);
+			GC.SuppressFinalize(this);
+		}
+
+		protected void Dispose(bool disposing)
+		{
+			if(disposed)
+			{
+				return;
+			}
+
+			if(disposing)
+			{
+				// Dispose managed code here.
+			}
+
+			try
+			{
+				Close();
+			}
+			catch
+			{
+				// Ignore errors.
+			}
+
+			disposed = true;
+		}
+
+		#endregion
+
+		public void  Close()
+		{
+            if(messageQueue != null)
+            {
+                messageQueue.Dispose();
+                messageQueue = null;
+            }
+		}
+
+		public string MessageSelector
+		{
+			get { return selector; }
+		}
+
+		public IQueue Queue
+		{
+			get { return new Queue(this.messageQueue.Path); }
+		}
+
+		internal class Enumerator : IEnumerator, IDisposable
+		{
+			private Session session;
+			private MessageEnumerator innerEnumerator;
+            private IMessageReader reader;
+
+			public Enumerator(Session session, MessageQueue messageQueue,
+                IMessageReader reader)
+			{
+				this.session = session;
+				this.innerEnumerator = messageQueue.GetMessageEnumerator2();
+                this.reader = reader;
+			}
+
+			public object Current
+			{
+				get
+				{
+					return this.session.MessageConverter.ToNmsMessage(this.innerEnumerator.Current);
+				}
+			}
+
+			public bool MoveNext()
+			{
+                while(this.innerEnumerator.MoveNext())
+                {
+				    if(reader.Matches(this.innerEnumerator.Current))
+                    {
+                        return true;
+                    }
+                }
+                return false;
+			}
+
+			public void Reset()
+			{
+				this.innerEnumerator.Reset();
+			}
+
+            public void Dispose()
+            {
+                if(innerEnumerator != null)
+                {
+                    innerEnumerator.Close();
+                    innerEnumerator = null;
+                }
+            }
+		}
+
+		public IEnumerator GetEnumerator()
+		{
+			return new Enumerator(this.session, this.messageQueue, this.reader);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/7274a80a/src/main/csharp/Readers/AbstractMessageReader.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Readers/AbstractMessageReader.cs b/src/main/csharp/Readers/AbstractMessageReader.cs
index 7874696..dcb217d 100644
--- a/src/main/csharp/Readers/AbstractMessageReader.cs
+++ b/src/main/csharp/Readers/AbstractMessageReader.cs
@@ -1,126 +1,126 @@
-using System;
-using System.Messaging;
-using Apache.NMS.MSMQ;
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace Apache.NMS.MSMQ.Readers
-{
-    /// <summary>
-    /// Abstract MSMQ message reader. Derived classes support various
-    /// message filtering methods.
-    /// </summary>
-	public abstract class AbstractMessageReader : IMessageReader
-	{
-        protected MessageQueue messageQueue;
-        protected IMessageConverter messageConverter;
-        protected IMessageConverterEx messageConverterEx;
-
-        /// <summary>
-        /// Constructor.
-        /// </summary>
-        /// <param name="messageQueue">The MSMQ message queue from which
-        /// messages will be read.</param>
-        /// <param name="messageConverter">A message converter for mapping
-        /// MSMQ messages to NMS messages.</param>
-        public AbstractMessageReader(MessageQueue messageQueue,
-            IMessageConverter messageConverter)
-        {
-            this.messageQueue = messageQueue;
-
-            this.messageConverter = messageConverter;
-            this.messageConverterEx = (messageConverter as IMessageConverterEx);
-        }
-
-        /// <summary>
-        /// Returns without removing (peeks) the first message in the queue
-        /// referenced by this MessageQueue matching the selection criteria.
-        /// The Peek method is synchronous, so it blocks the current thread
-        /// until a message becomes available.
-        /// </summary>
-        /// <returns>Peeked message.</returns>
-        public abstract IMessage Peek();
-
-        /// <summary>
-        /// Returns without removing (peeks) the first message in the queue
-        /// referenced by this MessageQueue matching the selection criteria.
-        /// The Peek method is synchronous, so it blocks the current thread
-        /// until a message becomes available or the specified time-out occurs.
-        /// </summary>
-        /// <param name="timeSpan">Reception time-out.</param>
-        /// <returns>Peeked message.</returns>
-        public abstract IMessage Peek(TimeSpan timeSpan);
-
-        /// <summary>
-        /// Receives the first message available in the queue referenced by
-        /// the MessageQueue matching the selection criteria.
-        /// This call is synchronous, and blocks the current thread of execution
-        /// until a message is available.
-        /// </summary>
-        /// <returns>Received message.</returns>
-        public abstract IMessage Receive();
-
-        /// <summary>
-        /// Receives the first message available in the queue referenced by the
-        /// MessageQueue matching the selection criteria, and waits until either
-        /// a message is available in the queue, or the time-out expires.
-        /// </summary>
-        /// <param name="timeSpan">Reception time-out.</param>
-        /// <returns>Received message.</returns>
-        public abstract IMessage Receive(TimeSpan timeSpan);
-
-        /// <summary>
-        /// Receives the first message available in the transactional queue
-        /// referenced by the MessageQueue matching the selection criteria.
-        /// This call is synchronous, and blocks the current thread of execution
-        /// until a message is available.
-        /// </summary>
-        /// <param name="transaction">Transaction.</param>
-        /// <returns>Received message.</returns>
-        public abstract IMessage Receive(MessageQueueTransaction transaction);
-
-        /// <summary>
-        /// Receives the first message available in the transactional queue
-        /// referenced by the MessageQueue matching the selection criteria,
-        /// and waits until either a message is available in the queue, or the
-        /// time-out expires.
-        /// </summary>
-        /// <param name="timeSpan">Reception time-out.</param>
-        /// <param name="transaction">Transaction.</param>
-        /// <returns>Received message.</returns>
-        public abstract IMessage Receive(TimeSpan timeSpan,
-            MessageQueueTransaction transaction);
-
-        /// <summary>
-        /// Checks if an MSMQ message matches the selection criteria.
-        /// </summary>
-        /// <param name="message">MSMQ message.</param>
-        /// <return>true if the message matches the selection criteria.</return>
-        public abstract bool Matches(Message message);
-
-        /// <summary>
-        /// Converts an MSMQ message to an NMS message, using the converter
-        /// specified at construction time.
-        /// </summary>
-        /// <param name="message">MSMQ message.</param>
-        /// <return>NMS message.</return>
-        protected IMessage Convert(Message message)
-        {
-            return message == null ? null : messageConverter.ToNmsMessage(message);
-        }
-	}
-}
+using System;
+using System.Messaging;
+using Apache.NMS.MSMQ;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.NMS.MSMQ.Readers
+{
+    /// <summary>
+    /// Abstract MSMQ message reader. Derived classes support various
+    /// message filtering methods.
+    /// </summary>
+	public abstract class AbstractMessageReader : IMessageReader
+	{
+        protected MessageQueue messageQueue;
+        protected IMessageConverter messageConverter;
+        protected IMessageConverterEx messageConverterEx;
+
+        /// <summary>
+        /// Constructor.
+        /// </summary>
+        /// <param name="messageQueue">The MSMQ message queue from which
+        /// messages will be read.</param>
+        /// <param name="messageConverter">A message converter for mapping
+        /// MSMQ messages to NMS messages.</param>
+        public AbstractMessageReader(MessageQueue messageQueue,
+            IMessageConverter messageConverter)
+        {
+            this.messageQueue = messageQueue;
+
+            this.messageConverter = messageConverter;
+            this.messageConverterEx = (messageConverter as IMessageConverterEx);
+        }
+
+        /// <summary>
+        /// Returns without removing (peeks) the first message in the queue
+        /// referenced by this MessageQueue matching the selection criteria.
+        /// The Peek method is synchronous, so it blocks the current thread
+        /// until a message becomes available.
+        /// </summary>
+        /// <returns>Peeked message.</returns>
+        public abstract IMessage Peek();
+
+        /// <summary>
+        /// Returns without removing (peeks) the first message in the queue
+        /// referenced by this MessageQueue matching the selection criteria.
+        /// The Peek method is synchronous, so it blocks the current thread
+        /// until a message becomes available or the specified time-out occurs.
+        /// </summary>
+        /// <param name="timeSpan">Reception time-out.</param>
+        /// <returns>Peeked message.</returns>
+        public abstract IMessage Peek(TimeSpan timeSpan);
+
+        /// <summary>
+        /// Receives the first message available in the queue referenced by
+        /// the MessageQueue matching the selection criteria.
+        /// This call is synchronous, and blocks the current thread of execution
+        /// until a message is available.
+        /// </summary>
+        /// <returns>Received message.</returns>
+        public abstract IMessage Receive();
+
+        /// <summary>
+        /// Receives the first message available in the queue referenced by the
+        /// MessageQueue matching the selection criteria, and waits until either
+        /// a message is available in the queue, or the time-out expires.
+        /// </summary>
+        /// <param name="timeSpan">Reception time-out.</param>
+        /// <returns>Received message.</returns>
+        public abstract IMessage Receive(TimeSpan timeSpan);
+
+        /// <summary>
+        /// Receives the first message available in the transactional queue
+        /// referenced by the MessageQueue matching the selection criteria.
+        /// This call is synchronous, and blocks the current thread of execution
+        /// until a message is available.
+        /// </summary>
+        /// <param name="transaction">Transaction.</param>
+        /// <returns>Received message.</returns>
+        public abstract IMessage Receive(MessageQueueTransaction transaction);
+
+        /// <summary>
+        /// Receives the first message available in the transactional queue
+        /// referenced by the MessageQueue matching the selection criteria,
+        /// and waits until either a message is available in the queue, or the
+        /// time-out expires.
+        /// </summary>
+        /// <param name="timeSpan">Reception time-out.</param>
+        /// <param name="transaction">Transaction.</param>
+        /// <returns>Received message.</returns>
+        public abstract IMessage Receive(TimeSpan timeSpan,
+            MessageQueueTransaction transaction);
+
+        /// <summary>
+        /// Checks if an MSMQ message matches the selection criteria.
+        /// </summary>
+        /// <param name="message">MSMQ message.</param>
+        /// <return>true if the message matches the selection criteria.</return>
+        public abstract bool Matches(Message message);
+
+        /// <summary>
+        /// Converts an MSMQ message to an NMS message, using the converter
+        /// specified at construction time.
+        /// </summary>
+        /// <param name="message">MSMQ message.</param>
+        /// <return>NMS message.</return>
+        protected IMessage Convert(Message message)
+        {
+            return message == null ? null : messageConverter.ToNmsMessage(message);
+        }
+	}
+}

http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/7274a80a/src/main/csharp/Readers/ByCorrelationIdMessageReader.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Readers/ByCorrelationIdMessageReader.cs b/src/main/csharp/Readers/ByCorrelationIdMessageReader.cs
index fad3d1a..bc8d832 100644
--- a/src/main/csharp/Readers/ByCorrelationIdMessageReader.cs
+++ b/src/main/csharp/Readers/ByCorrelationIdMessageReader.cs
@@ -1,139 +1,139 @@
-using System;
-using System.Messaging;
-using Apache.NMS.MSMQ;
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace Apache.NMS.MSMQ.Readers
-{
-    /// <summary>
-    /// MSMQ message reader, returning messages matching the specified
-    /// message identifier.
-    /// </summary>
-	public class ByCorrelationIdMessageReader : AbstractMessageReader
-	{
-        private string correlationId;
-
-        /// <summary>
-        /// Constructor.
-        /// </summary>
-        /// <param name="messageQueue">The MSMQ message queue from which
-        /// messages will be read.</param>
-        /// <param name="messageConverter">A message converter for mapping
-        /// MSMQ messages to NMS messages.</param>
-        /// <param name="correlationId">The correlation identifier of messages
-        /// to be read.</param>
-        public ByCorrelationIdMessageReader(MessageQueue messageQueue,
-            IMessageConverter messageConverter, string correlationId)
-            : base(messageQueue, messageConverter)
-        {
-            this.correlationId = correlationId;
-        }
-
-        /// <summary>
-        /// Returns without removing (peeks) the first message in the queue
-        /// referenced by this MessageQueue matching the selection criteria.
-        /// The Peek method is synchronous, so it blocks the current thread
-        /// until a message becomes available.
-        /// </summary>
-        /// <returns>Peeked message.</returns>
-        public override IMessage Peek()
-        {
-            return Convert(messageQueue.PeekByCorrelationId(correlationId));
-        }
-
-        /// <summary>
-        /// Returns without removing (peeks) the first message in the queue
-        /// referenced by this MessageQueue matching the selection criteria.
-        /// The Peek method is synchronous, so it blocks the current thread
-        /// until a message becomes available or the specified time-out occurs.
-        /// </summary>
-        /// <param name="timeSpan">Reception time-out.</param>
-        /// <returns>Peeked message.</returns>
-        public override IMessage Peek(TimeSpan timeSpan)
-        {
-            return Convert(messageQueue.PeekByCorrelationId(correlationId,
-                timeSpan));
-        }
-
-        /// <summary>
-        /// Receives the first message available in the queue referenced by
-        /// the MessageQueue matching the selection criteria.
-        /// This call is synchronous, and blocks the current thread of execution
-        /// until a message is available.
-        /// </summary>
-        /// <returns>Received message.</returns>
-        public override IMessage Receive()
-        {
-            return Convert(messageQueue.ReceiveByCorrelationId(correlationId));
-        }
-
-        /// <summary>
-        /// Receives the first message available in the queue referenced by the
-        /// MessageQueue matching the selection criteria, and waits until either
-        /// a message is available in the queue, or the time-out expires.
-        /// </summary>
-        /// <param name="timeSpan">Reception time-out.</param>
-        /// <returns>Received message.</returns>
-        public override IMessage Receive(TimeSpan timeSpan)
-        {
-            return Convert(messageQueue.ReceiveByCorrelationId(correlationId,
-                timeSpan));
-        }
-
-        /// <summary>
-        /// Receives the first message available in the transactional queue
-        /// referenced by the MessageQueue matching the selection criteria.
-        /// This call is synchronous, and blocks the current thread of execution
-        /// until a message is available.
-        /// </summary>
-        /// <param name="transaction">Transaction.</param>
-        /// <returns>Received message.</returns>
-        public override IMessage Receive(MessageQueueTransaction transaction)
-        {
-            return Convert(messageQueue.ReceiveByCorrelationId(correlationId,
-                transaction));
-        }
-
-        /// <summary>
-        /// Receives the first message available in the transactional queue
-        /// referenced by the MessageQueue matching the selection criteria,
-        /// and waits until either a message is available in the queue, or the
-        /// time-out expires.
-        /// </summary>
-        /// <param name="timeSpan">Reception time-out.</param>
-        /// <param name="transaction">Transaction.</param>
-        /// <returns>Received message.</returns>
-        public override IMessage Receive(TimeSpan timeSpan,
-            MessageQueueTransaction transaction)
-        {
-            return Convert(messageQueue.ReceiveByCorrelationId(correlationId,
-                timeSpan, transaction));
-        }
-
-        /// <summary>
-        /// Checks if an MSMQ message matches the selection criteria.
-        /// </summary>
-        /// <param name="message">MSMQ message.</param>
-        /// <return>true if the message matches the selection criteria.</return>
-        public override bool Matches(Message message)
-        {
-            // NB: case-sensitive match
-            return message.CorrelationId == correlationId;
-        }
-	}
-}
+using System;
+using System.Messaging;
+using Apache.NMS.MSMQ;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.NMS.MSMQ.Readers
+{
+    /// <summary>
+    /// MSMQ message reader, returning messages matching the specified
+    /// message identifier.
+    /// </summary>
+	public class ByCorrelationIdMessageReader : AbstractMessageReader
+	{
+        private string correlationId;
+
+        /// <summary>
+        /// Constructor.
+        /// </summary>
+        /// <param name="messageQueue">The MSMQ message queue from which
+        /// messages will be read.</param>
+        /// <param name="messageConverter">A message converter for mapping
+        /// MSMQ messages to NMS messages.</param>
+        /// <param name="correlationId">The correlation identifier of messages
+        /// to be read.</param>
+        public ByCorrelationIdMessageReader(MessageQueue messageQueue,
+            IMessageConverter messageConverter, string correlationId)
+            : base(messageQueue, messageConverter)
+        {
+            this.correlationId = correlationId;
+        }
+
+        /// <summary>
+        /// Returns without removing (peeks) the first message in the queue
+        /// referenced by this MessageQueue matching the selection criteria.
+        /// The Peek method is synchronous, so it blocks the current thread
+        /// until a message becomes available.
+        /// </summary>
+        /// <returns>Peeked message.</returns>
+        public override IMessage Peek()
+        {
+            return Convert(messageQueue.PeekByCorrelationId(correlationId));
+        }
+
+        /// <summary>
+        /// Returns without removing (peeks) the first message in the queue
+        /// referenced by this MessageQueue matching the selection criteria.
+        /// The Peek method is synchronous, so it blocks the current thread
+        /// until a message becomes available or the specified time-out occurs.
+        /// </summary>
+        /// <param name="timeSpan">Reception time-out.</param>
+        /// <returns>Peeked message.</returns>
+        public override IMessage Peek(TimeSpan timeSpan)
+        {
+            return Convert(messageQueue.PeekByCorrelationId(correlationId,
+                timeSpan));
+        }
+
+        /// <summary>
+        /// Receives the first message available in the queue referenced by
+        /// the MessageQueue matching the selection criteria.
+        /// This call is synchronous, and blocks the current thread of execution
+        /// until a message is available.
+        /// </summary>
+        /// <returns>Received message.</returns>
+        public override IMessage Receive()
+        {
+            return Convert(messageQueue.ReceiveByCorrelationId(correlationId));
+        }
+
+        /// <summary>
+        /// Receives the first message available in the queue referenced by the
+        /// MessageQueue matching the selection criteria, and waits until either
+        /// a message is available in the queue, or the time-out expires.
+        /// </summary>
+        /// <param name="timeSpan">Reception time-out.</param>
+        /// <returns>Received message.</returns>
+        public override IMessage Receive(TimeSpan timeSpan)
+        {
+            return Convert(messageQueue.ReceiveByCorrelationId(correlationId,
+                timeSpan));
+        }
+
+        /// <summary>
+        /// Receives the first message available in the transactional queue
+        /// referenced by the MessageQueue matching the selection criteria.
+        /// This call is synchronous, and blocks the current thread of execution
+        /// until a message is available.
+        /// </summary>
+        /// <param name="transaction">Transaction.</param>
+        /// <returns>Received message.</returns>
+        public override IMessage Receive(MessageQueueTransaction transaction)
+        {
+            return Convert(messageQueue.ReceiveByCorrelationId(correlationId,
+                transaction));
+        }
+
+        /// <summary>
+        /// Receives the first message available in the transactional queue
+        /// referenced by the MessageQueue matching the selection criteria,
+        /// and waits until either a message is available in the queue, or the
+        /// time-out expires.
+        /// </summary>
+        /// <param name="timeSpan">Reception time-out.</param>
+        /// <param name="transaction">Transaction.</param>
+        /// <returns>Received message.</returns>
+        public override IMessage Receive(TimeSpan timeSpan,
+            MessageQueueTransaction transaction)
+        {
+            return Convert(messageQueue.ReceiveByCorrelationId(correlationId,
+                timeSpan, transaction));
+        }
+
+        /// <summary>
+        /// Checks if an MSMQ message matches the selection criteria.
+        /// </summary>
+        /// <param name="message">MSMQ message.</param>
+        /// <return>true if the message matches the selection criteria.</return>
+        public override bool Matches(Message message)
+        {
+            // NB: case-sensitive match
+            return message.CorrelationId == correlationId;
+        }
+	}
+}


Mime
View raw message