activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jgo...@apache.org
Subject svn commit: r984268 - in /activemq/activemq-dotnet: Apache.NMS.ActiveMQ/trunk/ Apache.NMS.ActiveMQ/trunk/src/main/csharp/ Apache.NMS.ActiveMQ/trunk/src/test/csharp/ Apache.NMS/trunk/src/main/csharp/Util/
Date Tue, 10 Aug 2010 23:53:33 GMT
Author: jgomes
Date: Tue Aug 10 23:53:33 2010
New Revision: 984268

URL: http://svn.apache.org/viewvc?rev=984268&view=rev
Log:
Added support for processing expired messages that were received from the broker.
Fixes [AMQNET-268]. (See https://issues.apache.org/activemq/browse/AMQNET-268)

Added:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/MessageConsumerTest.cs
      - copied, changed from r982821, activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/VirtualTopicTest.cs
Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq-test.csproj
    activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/Util/URISupport.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=984268&r1=984267&r2=984268&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
Tue Aug 10 23:53:33 2010
@@ -97,9 +97,14 @@ namespace Apache.NMS.ActiveMQ
             // If the destination contained a URI query, then use it to set public properties
             // on the ConsumerInfo
             if(destination.Options != null)
-            {				
+            {
+				// Get options prefixed with "consumer.*"
 				StringDictionary options = URISupport.GetProperties(destination.Options, "consumer.");
+				// Extract out custom extension options "consumer.nms.*"
+				StringDictionary customConsumerOptions = URISupport.ExtractProperties(options, "nms.");
+
 				URISupport.SetProperties(this.info, options);
+				URISupport.SetProperties(this, customConsumerOptions, "nms.");
             }
 		}
 
@@ -145,7 +150,15 @@ namespace Apache.NMS.ActiveMQ
         public long UnconsumedMessageCount
         {
             get { return this.unconsumedMessages.Count; }
-        }   
+        }
+
+		// Custom Options
+		private bool ignoreExpiration = false;
+		public bool IgnoreExpiration
+		{
+			get { return ignoreExpiration; }
+			set { ignoreExpiration = value; }
+		}
 
 		#endregion
 
@@ -335,7 +348,7 @@ namespace Apache.NMS.ActiveMQ
 				this.session.Connection.Oneway(removeCommand);
 				this.session = null;
 
-                Tracer.Debug("Consumer instnace Closed.");
+                Tracer.Debug("Consumer instance Closed.");
             }
 		}
 
@@ -541,7 +554,7 @@ namespace Apache.NMS.ActiveMQ
 
 							try
 							{
-								bool expired = message.IsExpired();
+								bool expired = (!IgnoreExpiration && message.IsExpired());
 
 								if(!expired)
 								{
@@ -672,7 +685,7 @@ namespace Apache.NMS.ActiveMQ
 				{
 					return null;
 				}
-				else if(dispatch.Message.IsExpired())
+				else if(!IgnoreExpiration && dispatch.Message.IsExpired())
 				{
 					Tracer.DebugFormat("{0} received expired message: {1}", info.ConsumerId, dispatch.Message.MessageId);
 
@@ -727,7 +740,7 @@ namespace Apache.NMS.ActiveMQ
 				return;
 			}
 
-			if(expired == true)
+			if(expired)
 			{
 				lock(this.dispatchedMessages)
 				{

Copied: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/MessageConsumerTest.cs
(from r982821, activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/VirtualTopicTest.cs)
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/MessageConsumerTest.cs?p2=activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/MessageConsumerTest.cs&p1=activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/VirtualTopicTest.cs&r1=982821&r2=984268&rev=984268&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/VirtualTopicTest.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/MessageConsumerTest.cs
Tue Aug 10 23:53:33 2010
@@ -18,172 +18,116 @@
 using System.Threading;
 using Apache.NMS.Test;
 using NUnit.Framework;
+using Apache.NMS.ActiveMQ.Commands;
+using System;
+using Apache.NMS.Util;
 
 namespace Apache.NMS.ActiveMQ.Test
 {
-	[TestFixture]
-	public class VirtualTopicTest : NMSTestSupport
+	public enum ExpirationOptions
 	{
-		protected static string DESTINATION_NAME = "TestDestination";
-		protected static string PRODUCER_DESTINATION_NAME = "VirtualTopic." + DESTINATION_NAME;
-		protected static string CONSUMER_A_DESTINATION_NAME = "Consumer.A." + PRODUCER_DESTINATION_NAME;
-		protected static string CONSUMER_B_DESTINATION_NAME = "Consumer.B." + PRODUCER_DESTINATION_NAME;
-		protected static string TEST_CLIENT_ID = "VirtualTopicTestClientId";
+		DEFAULT,
+		IGNORE,
+		DO_NOT_IGNORE
+	}
 
-		protected const int totalMsgs = 5;
+	[TestFixture]
+	public class MessageConsumerTest : NMSTestSupport
+	{
+		protected static string DESTINATION_NAME = "queue://TestDestination";
+		protected static string TEST_CLIENT_ID = "MessageConsumerTestClientId";
 
 		[Test]
-		public void SendReceiveVirtualTopicMessage(
+		public void TestReceiveIgnoreExpirationMessage(
 			[Values(AcknowledgementMode.AutoAcknowledge, AcknowledgementMode.ClientAcknowledge,
 				AcknowledgementMode.DupsOkAcknowledge, AcknowledgementMode.Transactional)]
 			AcknowledgementMode ackMode,
 			[Values(MsgDeliveryMode.NonPersistent, MsgDeliveryMode.Persistent)]
-			MsgDeliveryMode deliveryMode)
+			MsgDeliveryMode deliveryMode,
+			[Values(ExpirationOptions.DEFAULT, ExpirationOptions.IGNORE, ExpirationOptions.DO_NOT_IGNORE)]
+			ExpirationOptions expirationOption)
 		{
 			using(IConnection connection = CreateConnection(TEST_CLIENT_ID))
 			{
 				connection.Start();
-				using(ISession session = connection.CreateSession(ackMode))
+				using(Session session = connection.CreateSession(ackMode) as Session)
 				{
-					using(IMessageConsumer consumerA = session.CreateConsumer(session.GetQueue(CONSUMER_A_DESTINATION_NAME)))
-					using(IMessageConsumer consumerB = session.CreateConsumer(session.GetQueue(CONSUMER_B_DESTINATION_NAME)))
-					using(IMessageProducer producer = session.CreateProducer(session.GetTopic(PRODUCER_DESTINATION_NAME)))
+					string destinationName = DESTINATION_NAME;
+
+					if(ExpirationOptions.IGNORE == expirationOption)
+					{
+						destinationName += "?consumer.nms.ignoreExpiration=true";
+					}
+					else if(ExpirationOptions.DO_NOT_IGNORE == expirationOption)
 					{
-						producer.RequestTimeout = receiveTimeout;
-						producer.DeliveryMode = deliveryMode;
+						destinationName += "?consumer.nms.ignoreExpiration=false";
+					}
 
-						for(int index = 0; index < totalMsgs; index++)
-						{
-							string msgText = "Message #" + index;
-							Tracer.Info("Sending: " + msgText);
-							producer.Send(session.CreateTextMessage(msgText));
-						}
+					try
+					{
+						IDestination destination = SessionUtil.GetDestination(session, destinationName);
 
-						if(AcknowledgementMode.Transactional == ackMode)
+						using(IMessageConsumer consumer = session.CreateConsumer(destination))
+						using(IMessageProducer producer = session.CreateProducer(destination))
 						{
-							session.Commit();
-						}
+							producer.RequestTimeout = receiveTimeout;
+							producer.DeliveryMode = deliveryMode;
 
-						for(int index = 0; index < totalMsgs; index++)
-						{
-							string msgText = "Message #" + index;
-							ITextMessage messageA = consumerA.Receive(receiveTimeout) as ITextMessage;
-							Assert.IsNotNull(messageA, "Did not receive message for consumer A.");
-							messageA.Acknowledge();
-							Tracer.Info("Received A: " + msgText);
-
-							ITextMessage messageB = consumerB.Receive(receiveTimeout) as ITextMessage;
-							Assert.IsNotNull(messageB, "Did not receive message for consumer B.");
-							messageB.Acknowledge();
-							Tracer.Info("Received B: " + msgText);
+							string msgText = "ExpiredMessage:" + Guid.NewGuid().ToString();
 
-							Assert.AreEqual(msgText, messageA.Text, "Message text A does not match.");
-							Assert.AreEqual(msgText, messageB.Text, "Message text B does not match.");
-						}
+							ActiveMQTextMessage msg = session.CreateTextMessage(msgText) as ActiveMQTextMessage;
 
-						if(AcknowledgementMode.Transactional == ackMode)
-						{
-							session.Commit();
-						}
-					}
+							// Give it two seconds to live.
+							msg.NMSTimeToLive = TimeSpan.FromMilliseconds(2000);
 
-                    // Give the Broker some time to remove the subscriptions.
-                    Thread.Sleep(2000);
+							producer.Send(msg);
 
-                    try
-                    {
-                        ((Session) session).DeleteDestination(session.GetQueue(CONSUMER_A_DESTINATION_NAME));
-                        ((Session) session).DeleteDestination(session.GetQueue(CONSUMER_B_DESTINATION_NAME));
-                    }
-                    catch
-                    {
-                    }
-				}
-			}
-		}
+							if(AcknowledgementMode.Transactional == ackMode)
+							{
+								session.Commit();
+							}
 
-		protected int receivedA;
-		protected int receivedB;
+							// Wait for four seconds before processing it.  The broker will have sent it to our
local
+							// client dispatch queue, but we won't attempt to process the message until it has
had
+							// a chance to expire within our internal queue system.
+							Thread.Sleep(4000);
 
-		[Test]
-		// Do not use listeners with transactional processing.
-		public void AsyncSendReceiveVirtualTopicMessage(
-			[Values(AcknowledgementMode.AutoAcknowledge, AcknowledgementMode.ClientAcknowledge, AcknowledgementMode.DupsOkAcknowledge)]
-			AcknowledgementMode ackMode,
-			[Values(MsgDeliveryMode.NonPersistent, MsgDeliveryMode.Persistent)]
-			MsgDeliveryMode deliveryMode)
-		{
-			receivedA = 0;
-			receivedB = 0;
+							ActiveMQTextMessage rcvMsg = consumer.ReceiveNoWait() as ActiveMQTextMessage;
 
-			using(IConnection connection = CreateConnection(TEST_CLIENT_ID))
-			{
-				connection.Start();
-				using(ISession session = connection.CreateSession(ackMode))
-				{
-					using(IMessageConsumer consumerA = session.CreateConsumer(session.GetQueue(CONSUMER_A_DESTINATION_NAME)))
-					using(IMessageConsumer consumerB = session.CreateConsumer(session.GetQueue(CONSUMER_B_DESTINATION_NAME)))
-					using(IMessageProducer producer = session.CreateProducer(session.GetTopic(PRODUCER_DESTINATION_NAME)))
-					{
-						producer.RequestTimeout = receiveTimeout;
-						producer.DeliveryMode = deliveryMode;
+							if(ExpirationOptions.IGNORE == expirationOption)
+							{
+								Assert.IsNotNull(rcvMsg, "Did not receive expired message.");
+								rcvMsg.Acknowledge();
 
-						consumerA.Listener += MessageListenerA;
-						consumerB.Listener += MessageListenerB;
+								Assert.AreEqual(msgText, rcvMsg.Text, "Message text does not match.");
+								Assert.IsTrue(rcvMsg.IsExpired());
 
-						for(int index = 0; index < totalMsgs; index++)
+								if(AcknowledgementMode.Transactional == ackMode)
+								{
+									session.Commit();
+								}
+							}
+							else
+							{
+								// Should not receive a message.
+								Assert.IsNull(rcvMsg, "Received an expired message!");
+							}
+						}
+					}
+					finally
+					{
+						try
 						{
-							string msgText = "Message #" + index;
-							Tracer.Info("Sending: " + msgText);
-							producer.Send(session.CreateTextMessage(msgText));
+							// Give the Broker some time to remove the subscriptions.
+							Thread.Sleep(2000);
+							SessionUtil.DeleteDestination(session, destinationName);
 						}
-
-						int waitCount = 0;
-						while(receivedA < totalMsgs && receivedB < totalMsgs)
+						catch
 						{
-							if(waitCount++ > 50)
-							{
-								Assert.Fail("Timed out waiting for message consumers.  A = " + receivedA + ", B =
" + receivedB);
-							}
-
-							Tracer.Info("Waiting... Received A = " + receivedA + ", Received B = " + receivedB);
-							Thread.Sleep(250);
 						}
 					}
-                    
-                    // Give the Broker some time to remove the subscriptions.
-                    Thread.Sleep(2000);
-
-                    try
-                    {
-                        ((Session) session).DeleteDestination(session.GetQueue(CONSUMER_A_DESTINATION_NAME));
-                        ((Session) session).DeleteDestination(session.GetQueue(CONSUMER_B_DESTINATION_NAME));
-				    }
-                    catch
-                    {
-                    }
-                }
+				}
 			}
 		}
-
-		private void MessageListenerA(IMessage message)
-		{
-			message.Acknowledge();
-			ITextMessage messageA = message as ITextMessage;
-			string msgText = "Message #" + receivedA;
-			Assert.AreEqual(msgText, messageA.Text, "Message text A does not match.");
-			Tracer.Info("Received Listener A: " + msgText);
-			receivedA++;
-		}
-
-		private void MessageListenerB(IMessage message)
-		{
-			message.Acknowledge();
-			ITextMessage messageB = message as ITextMessage;
-			string msgText = "Message #" + receivedB;
-			Assert.AreEqual(msgText, messageB.Text, "Message text B does not match.");
-			Tracer.Info("Received Listener B: " + msgText);
-			receivedB++;
-		}
 	}
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq-test.csproj
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq-test.csproj?rev=984268&r1=984267&r2=984268&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq-test.csproj (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq-test.csproj Tue Aug
10 23:53:33 2010
@@ -2,7 +2,7 @@
   <PropertyGroup>
     <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
     <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
-    <ProductVersion>9.0.21022</ProductVersion>
+    <ProductVersion>9.0.30729</ProductVersion>
     <SchemaVersion>2.0</SchemaVersion>
     <ProjectGuid>{EB943C69-2C9B-45E7-B95B-FB916E7057ED}</ProjectGuid>
     <OutputType>Library</OutputType>
@@ -74,6 +74,7 @@
     <Compile Include="src\test\csharp\CommonAssemblyInfo.cs" />
     <Compile Include="src\test\csharp\ConnectionFactoryTest.cs" />
     <Compile Include="src\test\csharp\ConnectionMetaDataTest.cs" />
+    <Compile Include="src\test\csharp\MessageConsumerTest.cs" />
     <Compile Include="src\test\csharp\ExclusiveConsumerTest.cs" />
     <Compile Include="src\test\csharp\IndividualAckTest.cs" />
     <Compile Include="src\test\csharp\MessageListenerRedeliveryTest.cs" />

Modified: activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/Util/URISupport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/Util/URISupport.cs?rev=984268&r1=984267&r2=984268&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/Util/URISupport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/Util/URISupport.cs Tue Aug 10
23:53:33 2010
@@ -233,7 +233,7 @@ namespace Apache.NMS.Util
 
             foreach(string key in props.Keys)
             {
-                if(key.StartsWith(prefix))
+                if(key.StartsWith(prefix, true, CultureInfo.InvariantCulture))
                 {
                     String value = props[key];
                     result[key] = value;



Mime
View raw message