Author: jgomes
Date: Tue Aug 10 23:53:33 2010
New Revision: 984268
URL: http://svn.apache.org/viewvc?rev=984268&view=rev
Log:
Added support for processing expired messages that were received from the broker.
Fixes [AMQNET-268]. (See https://issues.apache.org/activemq/browse/AMQNET-268)
Added:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/MessageConsumerTest.cs
- copied, changed from r982821, activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/VirtualTopicTest.cs
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq-test.csproj
activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/Util/URISupport.cs
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=984268&r1=984267&r2=984268&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
Tue Aug 10 23:53:33 2010
@@ -97,9 +97,14 @@ namespace Apache.NMS.ActiveMQ
// If the destination contained a URI query, then use it to set public properties
// on the ConsumerInfo
if(destination.Options != null)
- {
+ {
+ // Get options prefixed with "consumer.*"
StringDictionary options = URISupport.GetProperties(destination.Options, "consumer.");
+ // Extract out custom extension options "consumer.nms.*"
+ StringDictionary customConsumerOptions = URISupport.ExtractProperties(options, "nms.");
+
URISupport.SetProperties(this.info, options);
+ URISupport.SetProperties(this, customConsumerOptions, "nms.");
}
}
@@ -145,7 +150,15 @@ namespace Apache.NMS.ActiveMQ
public long UnconsumedMessageCount
{
get { return this.unconsumedMessages.Count; }
- }
+ }
+
+ // Custom Options
+ private bool ignoreExpiration = false;
+ public bool IgnoreExpiration
+ {
+ get { return ignoreExpiration; }
+ set { ignoreExpiration = value; }
+ }
#endregion
@@ -335,7 +348,7 @@ namespace Apache.NMS.ActiveMQ
this.session.Connection.Oneway(removeCommand);
this.session = null;
- Tracer.Debug("Consumer instnace Closed.");
+ Tracer.Debug("Consumer instance Closed.");
}
}
@@ -541,7 +554,7 @@ namespace Apache.NMS.ActiveMQ
try
{
- bool expired = message.IsExpired();
+ bool expired = (!IgnoreExpiration && message.IsExpired());
if(!expired)
{
@@ -672,7 +685,7 @@ namespace Apache.NMS.ActiveMQ
{
return null;
}
- else if(dispatch.Message.IsExpired())
+ else if(!IgnoreExpiration && dispatch.Message.IsExpired())
{
Tracer.DebugFormat("{0} received expired message: {1}", info.ConsumerId, dispatch.Message.MessageId);
@@ -727,7 +740,7 @@ namespace Apache.NMS.ActiveMQ
return;
}
- if(expired == true)
+ if(expired)
{
lock(this.dispatchedMessages)
{
Copied: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/MessageConsumerTest.cs
(from r982821, activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/VirtualTopicTest.cs)
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/MessageConsumerTest.cs?p2=activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/MessageConsumerTest.cs&p1=activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/VirtualTopicTest.cs&r1=982821&r2=984268&rev=984268&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/VirtualTopicTest.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/MessageConsumerTest.cs
Tue Aug 10 23:53:33 2010
@@ -18,172 +18,116 @@
using System.Threading;
using Apache.NMS.Test;
using NUnit.Framework;
+using Apache.NMS.ActiveMQ.Commands;
+using System;
+using Apache.NMS.Util;
namespace Apache.NMS.ActiveMQ.Test
{
- [TestFixture]
- public class VirtualTopicTest : NMSTestSupport
+ public enum ExpirationOptions
{
- protected static string DESTINATION_NAME = "TestDestination";
- protected static string PRODUCER_DESTINATION_NAME = "VirtualTopic." + DESTINATION_NAME;
- protected static string CONSUMER_A_DESTINATION_NAME = "Consumer.A." + PRODUCER_DESTINATION_NAME;
- protected static string CONSUMER_B_DESTINATION_NAME = "Consumer.B." + PRODUCER_DESTINATION_NAME;
- protected static string TEST_CLIENT_ID = "VirtualTopicTestClientId";
+ DEFAULT,
+ IGNORE,
+ DO_NOT_IGNORE
+ }
- protected const int totalMsgs = 5;
+ [TestFixture]
+ public class MessageConsumerTest : NMSTestSupport
+ {
+ protected static string DESTINATION_NAME = "queue://TestDestination";
+ protected static string TEST_CLIENT_ID = "MessageConsumerTestClientId";
[Test]
- public void SendReceiveVirtualTopicMessage(
+ public void TestReceiveIgnoreExpirationMessage(
[Values(AcknowledgementMode.AutoAcknowledge, AcknowledgementMode.ClientAcknowledge,
AcknowledgementMode.DupsOkAcknowledge, AcknowledgementMode.Transactional)]
AcknowledgementMode ackMode,
[Values(MsgDeliveryMode.NonPersistent, MsgDeliveryMode.Persistent)]
- MsgDeliveryMode deliveryMode)
+ MsgDeliveryMode deliveryMode,
+ [Values(ExpirationOptions.DEFAULT, ExpirationOptions.IGNORE, ExpirationOptions.DO_NOT_IGNORE)]
+ ExpirationOptions expirationOption)
{
using(IConnection connection = CreateConnection(TEST_CLIENT_ID))
{
connection.Start();
- using(ISession session = connection.CreateSession(ackMode))
+ using(Session session = connection.CreateSession(ackMode) as Session)
{
- using(IMessageConsumer consumerA = session.CreateConsumer(session.GetQueue(CONSUMER_A_DESTINATION_NAME)))
- using(IMessageConsumer consumerB = session.CreateConsumer(session.GetQueue(CONSUMER_B_DESTINATION_NAME)))
- using(IMessageProducer producer = session.CreateProducer(session.GetTopic(PRODUCER_DESTINATION_NAME)))
+ string destinationName = DESTINATION_NAME;
+
+ if(ExpirationOptions.IGNORE == expirationOption)
+ {
+ destinationName += "?consumer.nms.ignoreExpiration=true";
+ }
+ else if(ExpirationOptions.DO_NOT_IGNORE == expirationOption)
{
- producer.RequestTimeout = receiveTimeout;
- producer.DeliveryMode = deliveryMode;
+ destinationName += "?consumer.nms.ignoreExpiration=false";
+ }
- for(int index = 0; index < totalMsgs; index++)
- {
- string msgText = "Message #" + index;
- Tracer.Info("Sending: " + msgText);
- producer.Send(session.CreateTextMessage(msgText));
- }
+ try
+ {
+ IDestination destination = SessionUtil.GetDestination(session, destinationName);
- if(AcknowledgementMode.Transactional == ackMode)
+ using(IMessageConsumer consumer = session.CreateConsumer(destination))
+ using(IMessageProducer producer = session.CreateProducer(destination))
{
- session.Commit();
- }
+ producer.RequestTimeout = receiveTimeout;
+ producer.DeliveryMode = deliveryMode;
- for(int index = 0; index < totalMsgs; index++)
- {
- string msgText = "Message #" + index;
- ITextMessage messageA = consumerA.Receive(receiveTimeout) as ITextMessage;
- Assert.IsNotNull(messageA, "Did not receive message for consumer A.");
- messageA.Acknowledge();
- Tracer.Info("Received A: " + msgText);
-
- ITextMessage messageB = consumerB.Receive(receiveTimeout) as ITextMessage;
- Assert.IsNotNull(messageB, "Did not receive message for consumer B.");
- messageB.Acknowledge();
- Tracer.Info("Received B: " + msgText);
+ string msgText = "ExpiredMessage:" + Guid.NewGuid().ToString();
- Assert.AreEqual(msgText, messageA.Text, "Message text A does not match.");
- Assert.AreEqual(msgText, messageB.Text, "Message text B does not match.");
- }
+ ActiveMQTextMessage msg = session.CreateTextMessage(msgText) as ActiveMQTextMessage;
- if(AcknowledgementMode.Transactional == ackMode)
- {
- session.Commit();
- }
- }
+ // Give it two seconds to live.
+ msg.NMSTimeToLive = TimeSpan.FromMilliseconds(2000);
- // Give the Broker some time to remove the subscriptions.
- Thread.Sleep(2000);
+ producer.Send(msg);
- try
- {
- ((Session) session).DeleteDestination(session.GetQueue(CONSUMER_A_DESTINATION_NAME));
- ((Session) session).DeleteDestination(session.GetQueue(CONSUMER_B_DESTINATION_NAME));
- }
- catch
- {
- }
- }
- }
- }
+ if(AcknowledgementMode.Transactional == ackMode)
+ {
+ session.Commit();
+ }
- protected int receivedA;
- protected int receivedB;
+ // Wait for four seconds before processing it. The broker will have sent it to our
local
+ // client dispatch queue, but we won't attempt to process the message until it has
had
+ // a chance to expire within our internal queue system.
+ Thread.Sleep(4000);
- [Test]
- // Do not use listeners with transactional processing.
- public void AsyncSendReceiveVirtualTopicMessage(
- [Values(AcknowledgementMode.AutoAcknowledge, AcknowledgementMode.ClientAcknowledge, AcknowledgementMode.DupsOkAcknowledge)]
- AcknowledgementMode ackMode,
- [Values(MsgDeliveryMode.NonPersistent, MsgDeliveryMode.Persistent)]
- MsgDeliveryMode deliveryMode)
- {
- receivedA = 0;
- receivedB = 0;
+ ActiveMQTextMessage rcvMsg = consumer.ReceiveNoWait() as ActiveMQTextMessage;
- using(IConnection connection = CreateConnection(TEST_CLIENT_ID))
- {
- connection.Start();
- using(ISession session = connection.CreateSession(ackMode))
- {
- using(IMessageConsumer consumerA = session.CreateConsumer(session.GetQueue(CONSUMER_A_DESTINATION_NAME)))
- using(IMessageConsumer consumerB = session.CreateConsumer(session.GetQueue(CONSUMER_B_DESTINATION_NAME)))
- using(IMessageProducer producer = session.CreateProducer(session.GetTopic(PRODUCER_DESTINATION_NAME)))
- {
- producer.RequestTimeout = receiveTimeout;
- producer.DeliveryMode = deliveryMode;
+ if(ExpirationOptions.IGNORE == expirationOption)
+ {
+ Assert.IsNotNull(rcvMsg, "Did not receive expired message.");
+ rcvMsg.Acknowledge();
- consumerA.Listener += MessageListenerA;
- consumerB.Listener += MessageListenerB;
+ Assert.AreEqual(msgText, rcvMsg.Text, "Message text does not match.");
+ Assert.IsTrue(rcvMsg.IsExpired());
- for(int index = 0; index < totalMsgs; index++)
+ if(AcknowledgementMode.Transactional == ackMode)
+ {
+ session.Commit();
+ }
+ }
+ else
+ {
+ // Should not receive a message.
+ Assert.IsNull(rcvMsg, "Received an expired message!");
+ }
+ }
+ }
+ finally
+ {
+ try
{
- string msgText = "Message #" + index;
- Tracer.Info("Sending: " + msgText);
- producer.Send(session.CreateTextMessage(msgText));
+ // Give the Broker some time to remove the subscriptions.
+ Thread.Sleep(2000);
+ SessionUtil.DeleteDestination(session, destinationName);
}
-
- int waitCount = 0;
- while(receivedA < totalMsgs && receivedB < totalMsgs)
+ catch
{
- if(waitCount++ > 50)
- {
- Assert.Fail("Timed out waiting for message consumers. A = " + receivedA + ", B =
" + receivedB);
- }
-
- Tracer.Info("Waiting... Received A = " + receivedA + ", Received B = " + receivedB);
- Thread.Sleep(250);
}
}
-
- // Give the Broker some time to remove the subscriptions.
- Thread.Sleep(2000);
-
- try
- {
- ((Session) session).DeleteDestination(session.GetQueue(CONSUMER_A_DESTINATION_NAME));
- ((Session) session).DeleteDestination(session.GetQueue(CONSUMER_B_DESTINATION_NAME));
- }
- catch
- {
- }
- }
+ }
}
}
-
- private void MessageListenerA(IMessage message)
- {
- message.Acknowledge();
- ITextMessage messageA = message as ITextMessage;
- string msgText = "Message #" + receivedA;
- Assert.AreEqual(msgText, messageA.Text, "Message text A does not match.");
- Tracer.Info("Received Listener A: " + msgText);
- receivedA++;
- }
-
- private void MessageListenerB(IMessage message)
- {
- message.Acknowledge();
- ITextMessage messageB = message as ITextMessage;
- string msgText = "Message #" + receivedB;
- Assert.AreEqual(msgText, messageB.Text, "Message text B does not match.");
- Tracer.Info("Received Listener B: " + msgText);
- receivedB++;
- }
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq-test.csproj
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq-test.csproj?rev=984268&r1=984267&r2=984268&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq-test.csproj (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq-test.csproj Tue Aug
10 23:53:33 2010
@@ -2,7 +2,7 @@
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
- <ProductVersion>9.0.21022</ProductVersion>
+ <ProductVersion>9.0.30729</ProductVersion>
<SchemaVersion>2.0</SchemaVersion>
<ProjectGuid>{EB943C69-2C9B-45E7-B95B-FB916E7057ED}</ProjectGuid>
<OutputType>Library</OutputType>
@@ -74,6 +74,7 @@
<Compile Include="src\test\csharp\CommonAssemblyInfo.cs" />
<Compile Include="src\test\csharp\ConnectionFactoryTest.cs" />
<Compile Include="src\test\csharp\ConnectionMetaDataTest.cs" />
+ <Compile Include="src\test\csharp\MessageConsumerTest.cs" />
<Compile Include="src\test\csharp\ExclusiveConsumerTest.cs" />
<Compile Include="src\test\csharp\IndividualAckTest.cs" />
<Compile Include="src\test\csharp\MessageListenerRedeliveryTest.cs" />
Modified: activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/Util/URISupport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/Util/URISupport.cs?rev=984268&r1=984267&r2=984268&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/Util/URISupport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/Util/URISupport.cs Tue Aug 10
23:53:33 2010
@@ -233,7 +233,7 @@ namespace Apache.NMS.Util
foreach(string key in props.Keys)
{
- if(key.StartsWith(prefix))
+ if(key.StartsWith(prefix, true, CultureInfo.InvariantCulture))
{
String value = props[key];
result[key] = value;
|