activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jgo...@apache.org
Subject svn commit: r1597145 - in /activemq/activemq-dotnet/Apache.NMS.Stomp/trunk: src/main/csharp/MessageConsumer.cs src/main/csharp/Session.cs src/test/csharp/MessageConsumerTest.cs vs2008-stomp-test.csproj
Date Fri, 23 May 2014 18:26:34 GMT
Author: jgomes
Date: Fri May 23 18:26:34 2014
New Revision: 1597145

URL: http://svn.apache.org/r1597145
Log:
Copy the ignoreExpiration implementation from the OpenWire provider.
Fixes [AMQNET-478]. (See https://issues.apache.org/jira/browse/AMQNET-478)

Added:
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/MessageConsumerTest.cs
Modified:
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs?rev=1597145&r1=1597144&r2=1597145&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/MessageConsumer.cs Fri
May 23 18:26:34 2014
@@ -15,6 +15,7 @@
  * limitations under the License.
  */
 using System;
+using System.Collections.Specialized;
 using System.Threading;
 using System.Collections.Generic;
 using Apache.NMS.Stomp.Commands;
@@ -58,14 +59,45 @@ namespace Apache.NMS.Stomp
         private IRedeliveryPolicy redeliveryPolicy;
         private Exception failureError;
 
-        // Constructor internal to prevent clients from creating an instance.
-        internal MessageConsumer(Session session, ConsumerInfo info)
+		// Constructor internal to prevent clients from creating an instance.
+        internal MessageConsumer(Session session, ConsumerId id, Destination destination,
string name, string selector, int prefetch, bool noLocal)
         {
-            this.session = session;
-            this.info = info;
+			if(destination == null)
+			{
+				throw new InvalidDestinationException("Consumer cannot receive on Null Destinations.");
+			}
+			
+			this.session = session;
             this.redeliveryPolicy = this.session.Connection.RedeliveryPolicy;
             this.messageTransformation = this.session.Connection.MessageTransformation;
-        }
+
+			this.info = new ConsumerInfo();
+			this.info.ConsumerId = id;
+			this.info.Destination = Destination.Transform(destination);
+			this.info.SubscriptionName = name;
+			this.info.Selector = selector;
+			this.info.PrefetchSize = prefetch;
+			this.info.MaximumPendingMessageLimit = session.Connection.PrefetchPolicy.MaximumPendingMessageLimit;
+			this.info.NoLocal = noLocal;
+			this.info.DispatchAsync = session.DispatchAsync;
+			this.info.Retroactive = session.Retroactive;
+			this.info.Exclusive = session.Exclusive;
+			this.info.Priority = session.Priority;
+			this.info.AckMode = session.AcknowledgementMode;
+
+			// If the destination contained a URI query, then use it to set public properties
+			// on the ConsumerInfo
+			if(destination.Options != null)
+			{
+				// Get options prefixed with "consumer.*"
+				StringDictionary options = URISupport.GetProperties(destination.Options, "consumer.");
+				// Extract out custom extension options "consumer.nms.*"
+				StringDictionary customConsumerOptions = URISupport.ExtractProperties(options, "nms.");
+
+				URISupport.SetProperties(this.info, options);
+				URISupport.SetProperties(this, customConsumerOptions, "nms.");
+			}
+		}
 
         ~MessageConsumer()
         {
@@ -79,7 +111,12 @@ namespace Apache.NMS.Stomp
             get { return info.ConsumerId; }
         }
 
-        public int PrefetchSize
+		public ConsumerInfo ConsumerInfo
+		{
+			get { return this.info; }
+		}
+		
+		public int PrefetchSize
         {
             get { return this.info.PrefetchSize; }
         }
@@ -90,18 +127,26 @@ namespace Apache.NMS.Stomp
             set { this.redeliveryPolicy = value; }
         }
 
-        private ConsumerTransformerDelegate consumerTransformer;
-        public ConsumerTransformerDelegate ConsumerTransformer
-        {
-            get { return this.consumerTransformer; }
-            set { this.consumerTransformer = value; }
-        }
+		// Custom Options
+		private bool ignoreExpiration = false;
+		public bool IgnoreExpiration
+		{
+			get { return ignoreExpiration; }
+			set { ignoreExpiration = value; }
+		}
 
-        #endregion
+		#endregion
 
         #region IMessageConsumer Members
 
-        public event MessageListener Listener
+		private ConsumerTransformerDelegate consumerTransformer;
+		public ConsumerTransformerDelegate ConsumerTransformer
+		{
+			get { return this.consumerTransformer; }
+			set { this.consumerTransformer = value; }
+		}
+
+		public event MessageListener Listener
         {
             add
             {
@@ -424,7 +469,7 @@ namespace Apache.NMS.Stomp
 
                             try
                             {
-                                bool expired = message.IsExpired();
+								bool expired = (!IgnoreExpiration && message.IsExpired());
 
                                 if(!expired)
                                 {
@@ -548,7 +593,7 @@ namespace Apache.NMS.Stomp
                 {
                     return null;
                 }
-                else if(dispatch.Message.IsExpired())
+				else if(!IgnoreExpiration && dispatch.Message.IsExpired())
                 {
                     Tracer.DebugFormat("{0} received expired message: {1}", info.ConsumerId,
dispatch.Message.MessageId);
 

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs?rev=1597145&r1=1597144&r2=1597145&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Session.cs Fri May 23
18:26:34 2014
@@ -416,40 +416,47 @@ namespace Apache.NMS.Stomp
                 throw new InvalidDestinationException("Cannot create a Consumer with a Null
destination");
             }
 
-            ConsumerInfo command = CreateConsumerInfo(destination, selector);
-            command.NoLocal = noLocal;
-            ConsumerId consumerId = command.ConsumerId;
-            MessageConsumer consumer = null;
+			int prefetchSize = this.Connection.PrefetchPolicy.DurableTopicPrefetch;
 
-            // Registered with Connection before we register at the broker.
-            connection.addDispatcher(consumerId, this);
+			if(destination.IsTopic)
+			{
+				prefetchSize = this.connection.PrefetchPolicy.TopicPrefetch;
+			}
+			else if(destination.IsQueue)
+			{
+				prefetchSize = this.connection.PrefetchPolicy.QueuePrefetch;
+			}
+			
+            MessageConsumer consumer = null;
 
             try
             {
-                consumer = new MessageConsumer(this, command);
+	            Destination dest = destination as Destination;
+				consumer = new MessageConsumer(this, GetNextConsumerId(), dest, null, selector, prefetchSize,
noLocal);
                 consumer.ConsumerTransformer = this.ConsumerTransformer;
-                consumers[consumerId] = consumer;
+				this.AddConsumer(consumer);
 
-                if(this.Started)
+				// lets register the consumer first in case we start dispatching messages immediately
+				this.Connection.SyncRequest(consumer.ConsumerInfo);
+
+				if(this.Started)
                 {
                     consumer.Start();
                 }
-
-                // lets register the consumer first in case we start dispatching messages
immediately
-                this.Connection.SyncRequest(command);
-
-                return consumer;
             }
             catch(Exception)
             {
                 if(consumer != null)
                 {
+					this.RemoveConsumer(consumer);
                     consumer.Close();
                 }
 
                 throw;
             }
-        }
+
+			return consumer;
+		}
 
         public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string
selector, bool noLocal)
         {
@@ -458,33 +465,26 @@ namespace Apache.NMS.Stomp
                 throw new InvalidDestinationException("Cannot create a Consumer with a Null
destination");
             }
 
-            ConsumerInfo command = CreateConsumerInfo(destination, selector);
-            ConsumerId consumerId = command.ConsumerId;
-            command.SubscriptionName = name;
-            command.NoLocal = noLocal;
-            command.PrefetchSize = this.connection.PrefetchPolicy.DurableTopicPrefetch;
             MessageConsumer consumer = null;
 
-            // Registered with Connection before we register at the broker.
-            connection.addDispatcher(consumerId, this);
-
             try
             {
-                consumer = new MessageConsumer(this, command);
-                consumer.ConsumerTransformer = this.ConsumerTransformer;
-                consumers[consumerId] = consumer;
+				Destination dest = destination as Destination;
+				consumer = new MessageConsumer(this, GetNextConsumerId(), dest, name, selector, this.connection.PrefetchPolicy.DurableTopicPrefetch,
noLocal);
+				consumer.ConsumerTransformer = this.ConsumerTransformer;
+				this.AddConsumer(consumer);
+				this.connection.SyncRequest(consumer.ConsumerInfo);
 
-                if(this.Started)
+				if(this.Started)
                 {
                     consumer.Start();
                 }
-
-                this.connection.SyncRequest(command);
             }
             catch(Exception)
             {
                 if(consumer != null)
                 {
+					this.RemoveConsumer(consumer);
                     consumer.Close();
                 }
 
@@ -633,7 +633,26 @@ namespace Apache.NMS.Stomp
 
         #endregion
 
-        public void DoSend( Message message, MessageProducer producer, TimeSpan sendTimeout
)
+		public void AddConsumer(MessageConsumer consumer)
+		{
+			if(!this.closing)
+			{
+				// Registered with Connection before we register at the broker.
+				consumers[consumer.ConsumerId] = consumer;
+				connection.addDispatcher(consumer.ConsumerId, this);
+			}
+		}
+
+		public void RemoveConsumer(MessageConsumer consumer)
+		{
+			connection.removeDispatcher(consumer.ConsumerId);
+			if(!this.closing)
+			{
+				consumers.Remove(consumer.ConsumerId);
+			}
+		}
+
+		public void DoSend(Message message, MessageProducer producer, TimeSpan sendTimeout)
         {
             Message msg = message;
 
@@ -699,52 +718,10 @@ namespace Apache.NMS.Stomp
             }
         }
 
-        protected virtual ConsumerInfo CreateConsumerInfo(IDestination destination, string
selector)
-        {
-            ConsumerInfo answer = new ConsumerInfo();
-            ConsumerId id = new ConsumerId();
-            id.ConnectionId = info.SessionId.ConnectionId;
-            id.SessionId = info.SessionId.Value;
-            id.Value = Interlocked.Increment(ref consumerCounter);
-            answer.ConsumerId = id;
-            answer.Destination = Destination.Transform(destination);
-            answer.Selector = selector;
-            answer.Priority = this.Priority;
-            answer.Exclusive = this.Exclusive;
-            answer.DispatchAsync = this.DispatchAsync;
-            answer.Retroactive = this.Retroactive;
-            answer.MaximumPendingMessageLimit = this.connection.PrefetchPolicy.MaximumPendingMessageLimit;
-            answer.AckMode = this.AcknowledgementMode;
-
-            if(destination is ITopic || destination is ITemporaryTopic)
-            {
-                answer.PrefetchSize = this.connection.PrefetchPolicy.TopicPrefetch;
-            }
-            else if(destination is IQueue || destination is ITemporaryQueue)
-            {
-                answer.PrefetchSize = this.connection.PrefetchPolicy.QueuePrefetch;
-            }
-
-            // If the destination contained a URI query, then use it to set public properties
-            // on the ConsumerInfo
-            Destination amqDestination = destination as Destination;
-            if(amqDestination != null && amqDestination.Options != null)
-            {
-                StringDictionary options = URISupport.GetProperties(amqDestination.Options,
"consumer.");
-                URISupport.SetProperties(answer, options);
-            }
-
-            return answer;
-        }
-
         protected virtual ProducerInfo CreateProducerInfo(IDestination destination)
         {
             ProducerInfo answer = new ProducerInfo();
-            ProducerId id = new ProducerId();
-            id.ConnectionId = info.SessionId.ConnectionId;
-            id.SessionId = info.SessionId.Value;
-            id.Value = Interlocked.Increment(ref producerCounter);
-            answer.ProducerId = id;
+            answer.ProducerId = GetNextProducerId();
             answer.Destination = Destination.Transform(destination);
 
             // If the destination contained a URI query, then use it to set public
@@ -759,7 +736,27 @@ namespace Apache.NMS.Stomp
             return answer;
         }
 
-        public void Stop()
+		public ConsumerId GetNextConsumerId()
+		{
+			ConsumerId id = new ConsumerId();
+			id.ConnectionId = info.SessionId.ConnectionId;
+			id.SessionId = info.SessionId.Value;
+			id.Value = Interlocked.Increment(ref consumerCounter);
+
+			return id;
+		}
+
+		public ProducerId GetNextProducerId()
+		{
+			ProducerId id = new ProducerId();
+			id.ConnectionId = info.SessionId.ConnectionId;
+			id.SessionId = info.SessionId.Value;
+			id.Value = Interlocked.Increment(ref producerCounter);
+
+			return id;
+		}
+
+		public void Stop()
         {
             if(this.executor != null)
             {

Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/MessageConsumerTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/MessageConsumerTest.cs?rev=1597145&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/MessageConsumerTest.cs
(added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/MessageConsumerTest.cs
Fri May 23 18:26:34 2014
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Threading;
+using Apache.NMS.Test;
+using NUnit.Framework;
+using Apache.NMS.Stomp.Commands;
+using System;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.Stomp.Test
+{
+    public enum ExpirationOptions
+    {
+        DEFAULT,
+        IGNORE,
+        DO_NOT_IGNORE
+    }
+
+    [TestFixture]
+    public class MessageConsumerTest : NMSTestSupport
+    {
+        protected static string DESTINATION_NAME = "queue://TEST.MessageConsumerTestDestination";
+        protected static string TEST_CLIENT_ID = "MessageConsumerTestClientId";
+
+        private CountDownLatch doneLatch;
+        private int counter;
+        private String errorMessage;
+
+        [SetUp]
+        public override void SetUp()
+        {
+            base.SetUp();
+
+            this.doneLatch = new CountDownLatch(1);
+            this.counter = 0;
+            this.errorMessage = null;
+        }
+
+        [Test]
+        public void TestBadSelectorDoesNotCloseConnection()
+        {
+            using(IConnection connection = CreateConnection(TEST_CLIENT_ID))
+            {
+                using(ISession sender = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+                using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+                {
+                    IDestination destination = sender.CreateTemporaryQueue();
+
+                    IMessageProducer producer = sender.CreateProducer(destination);
+                    ITextMessage goodMsg = sender.CreateTextMessage("testGood");
+                    producer.Send(goodMsg);
+
+                    IMessageConsumer consumer = session.CreateConsumer(destination);
+                    connection.Start();
+                    Assert.NotNull(consumer.Receive(TimeSpan.FromMilliseconds(5000)));
+
+                    try
+                    {
+                        ISession badListenerSession = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+                        badListenerSession.CreateConsumer(destination, "badSelector;too");
+                        Assert.Fail("Exception expected.");
+                    }
+                    catch(Exception e)
+                    {
+                        Tracer.DebugFormat("Caught Ex: {0}", e);
+                    }
+
+                    ITextMessage failMsg = sender.CreateTextMessage("testFail");
+                    producer.Send(failMsg);
+                    Assert.NotNull(consumer.Receive(TimeSpan.FromMilliseconds(5000)));
+                }
+            }
+        }
+
+        [Test]
+        public void TestAsyncDispatchExceptionRedelivers()
+        {
+            using(IConnection connection = CreateConnection(TEST_CLIENT_ID))
+            {
+                using(ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+                {
+                    IQueue queue = SessionUtil.GetDestination(session, DESTINATION_NAME)
as IQueue;
+
+                    using(IMessageProducer producer = session.CreateProducer(queue))
+                    {
+                        producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
+                        producer.Send(producer.CreateTextMessage("First"));
+                        producer.Send(producer.CreateTextMessage("Second"));
+                    }
+
+                    using(IMessageConsumer consumer = session.CreateConsumer(queue))
+                    {
+                        consumer.Listener += OnTestAsynchRedliversMessage;
+
+                        connection.Start();
+
+                        if(doneLatch.await(TimeSpan.FromSeconds(10)))
+                        {
+                            if(!String.IsNullOrEmpty(errorMessage))
+                            {
+                                Assert.Fail(errorMessage);
+                            }
+                        }
+                        else
+                        {
+                            Assert.Fail("Timeout waiting for async message delivery to complete.");
+                        }
+                    }
+                }
+            }
+        }
+
+        private void OnTestAsynchRedliversMessage(IMessage msg)
+        {
+            counter++;
+            try
+            {
+                ITextMessage message = msg as ITextMessage;
+                switch(counter)
+                {
+                    case 1:
+                        Tracer.Debug("Got first Message: " + message.Text);
+                        Assert.AreEqual("First", message.Text);
+                        Assert.IsFalse(message.NMSRedelivered);
+                        break;
+                    case 2:
+                        Tracer.Debug("Got Second Message: " + message.Text);
+                        Assert.AreEqual("Second", message.Text);
+                        Assert.IsFalse(message.NMSRedelivered);
+                        throw new Exception("Ignore Me");
+                    case 3:
+                        Tracer.Debug("Got Third Message: " + message.Text);
+                        Assert.AreEqual("Second", message.Text);
+                        Assert.IsTrue(message.NMSRedelivered);
+                        doneLatch.countDown();
+                        break;
+                    default:
+                        errorMessage = "Got too many messages: " + counter;
+                        Tracer.Debug(errorMessage);
+                        doneLatch.countDown();
+                        break;
+                }
+            }
+            catch(Exception e)
+            {
+                if(e.Message.Equals("Ignore Me"))
+                {
+                    throw;
+                }
+                errorMessage = "Got exception: " + e.Message;
+                Tracer.Warn("Exception on Message Receive: " + e.Message);
+                doneLatch.countDown();
+            }
+        }
+
+        [Test]
+        public void ConsumeInTwoThreads()
+        {
+            ParameterizedThreadStart threadStart =
+                delegate(object o)
+                {
+                    IMessageConsumer consumer = (IMessageConsumer) o;
+                    IMessage message = consumer.Receive(TimeSpan.FromSeconds(2));
+                    Assert.IsNotNull(message);
+                };
+
+            using(IConnection connection = CreateConnection(TEST_CLIENT_ID))
+            {
+                connection.Start();
+                using(ISession session = connection.CreateSession(AcknowledgementMode.Transactional))
+                {
+                    IQueue queue = SessionUtil.GetDestination(session, DESTINATION_NAME)
as IQueue;
+
+                    // enqueue 2 messages
+                    using(IMessageConsumer consumer = session.CreateConsumer(queue))
+                    using(IMessageProducer producer = session.CreateProducer(queue))
+                    {
+                        producer.DeliveryMode = MsgDeliveryMode.Persistent;
+                        producer.Send(producer.CreateMessage());
+                        producer.Send(producer.CreateMessage());
+                        session.Commit();
+
+                        // receive first using a dedicated thread. This works
+                        Thread thread = new Thread(threadStart);
+                        thread.Start(consumer);
+                        thread.Join();
+                        session.Commit();
+
+                        // receive second using main thread. This FAILS
+                        IMessage message = consumer.Receive(TimeSpan.FromSeconds(2)); //
throws System.Threading.AbandonedMutexException
+                        Assert.IsNotNull(message);
+                        session.Commit();
+                    }
+                }
+            }
+        }
+
+        [Test]
+        public void TestReceiveIgnoreExpirationMessage(
+            [Values(AcknowledgementMode.AutoAcknowledge, AcknowledgementMode.ClientAcknowledge,
+                AcknowledgementMode.DupsOkAcknowledge, AcknowledgementMode.Transactional)]
+            AcknowledgementMode ackMode,
+            [Values(MsgDeliveryMode.NonPersistent, MsgDeliveryMode.Persistent)]
+            MsgDeliveryMode deliveryMode,
+            [Values(ExpirationOptions.DEFAULT, ExpirationOptions.IGNORE, ExpirationOptions.DO_NOT_IGNORE)]
+            ExpirationOptions expirationOption)
+        {
+            using(IConnection connection = CreateConnection(TEST_CLIENT_ID))
+            {
+                connection.Start();
+                using(Session session = connection.CreateSession(ackMode) as Session)
+                {
+                    string destinationName = DESTINATION_NAME;
+
+                    if(ExpirationOptions.IGNORE == expirationOption)
+                    {
+                        destinationName += "?consumer.nms.ignoreExpiration=true";
+                    }
+                    else if(ExpirationOptions.DO_NOT_IGNORE == expirationOption)
+                    {
+                        destinationName += "?consumer.nms.ignoreExpiration=false";
+                    }
+
+                    try
+                    {
+                        IDestination destination = SessionUtil.GetDestination(session, destinationName);
+
+                        using(IMessageConsumer consumer = session.CreateConsumer(destination))
+                        using(IMessageProducer producer = session.CreateProducer(destination))
+                        {
+                            producer.DeliveryMode = deliveryMode;
+
+                            string msgText = string.Format("ExpiredMessage: {0}", Guid.NewGuid().ToString());
+
+                            TextMessage msg = session.CreateTextMessage(msgText) as TextMessage;
+
+                            // Give it two seconds to live.
+                            msg.NMSTimeToLive = TimeSpan.FromMilliseconds(2000);
+
+                            producer.Send(msg);
+
+                            if(AcknowledgementMode.Transactional == ackMode)
+                            {
+                                session.Commit();
+                            }
+
+                            // Wait for four seconds before processing it.  The broker will
have sent it to our local
+                            // client dispatch queue, but we won't attempt to process the
message until it has had
+                            // a chance to expire within our internal queue system.
+                            Thread.Sleep(4000);
+
+                            TextMessage rcvMsg = consumer.ReceiveNoWait() as TextMessage;
+
+                            if(ExpirationOptions.IGNORE == expirationOption)
+                            {
+                                Assert.IsNotNull(rcvMsg, "Did not receive expired message.");
+                                rcvMsg.Acknowledge();
+
+                                Assert.AreEqual(msgText, rcvMsg.Text, "Message text does
not match.");
+                                Assert.IsTrue(rcvMsg.IsExpired());
+
+                                if(AcknowledgementMode.Transactional == ackMode)
+                                {
+                                    session.Commit();
+                                }
+                            }
+                            else
+                            {
+                                // Should not receive a message.
+                                Assert.IsNull(rcvMsg, "Received an expired message!");
+                            }
+
+                            consumer.Close();
+                            producer.Close();
+                        }
+                    }
+                    finally
+                    {
+                        try
+                        {
+                            // Ensure that Session resources on the Broker release transacted
Consumers.
+                            session.Close();
+                            // Give the Broker some time to remove the subscriptions.
+                            Thread.Sleep(2000);
+                            SessionUtil.DeleteDestination(session, destinationName);
+                        }
+                        catch
+                        {
+                        }
+                    }
+                }
+            }
+        }
+    }
+}

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj?rev=1597145&r1=1597144&r2=1597145&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj Fri May 23 18:26:34
2014
@@ -68,6 +68,7 @@
       <HintPath>lib\NUnit\net-2.0\nunit.framework.dll</HintPath>
     </Reference>
     <Reference Include="System" />
+    <Reference Include="System.Data" />
     <Reference Include="System.Xml" />
   </ItemGroup>
   <ItemGroup>
@@ -104,6 +105,7 @@
   <ItemGroup>
     <Compile Include="src\test\csharp\AMQNET383Test.cs" />
     <Compile Include="src\test\csharp\InvalidCredentialsTest.cs" />
+    <Compile Include="src\test\csharp\MessageConsumerTest.cs" />
     <Compile Include="src\test\csharp\StompHelperTest.cs" />
     <Compile Include="src\test\csharp\StompRedeliveryPolicyTest.cs" />
     <Compile Include="src\test\csharp\Threads\CompositeTaskRunnerTest.cs" />



Mime
View raw message