Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 21480 invoked from network); 31 Mar 2011 22:00:21 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 31 Mar 2011 22:00:21 -0000 Received: (qmail 72603 invoked by uid 500); 31 Mar 2011 22:00:21 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 72577 invoked by uid 500); 31 Mar 2011 22:00:21 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 72570 invoked by uid 99); 31 Mar 2011 22:00:21 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 31 Mar 2011 22:00:21 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 31 Mar 2011 22:00:18 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id DA13B23888FE; Thu, 31 Mar 2011 21:59:56 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1087454 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src: main/csharp/MessageConsumer.cs test/csharp/AMQRedeliveryPolicyTest.cs Date: Thu, 31 Mar 2011 21:59:56 -0000 To: commits@activemq.apache.org From: tabish@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110331215956.DA13B23888FE@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: tabish Date: Thu Mar 31 21:59:56 2011 New Revision: 1087454 URL: http://svn.apache.org/viewvc?rev=1087454&view=rev Log: fix for: https://issues.apache.org/jira/browse/AMQNET-323 Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQRedeliveryPolicyTest.cs Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=1087454&r1=1087453&r2=1087454&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs Thu Mar 31 21:59:56 2011 @@ -661,18 +661,7 @@ namespace Apache.NMS.ActiveMQ MessageDispatch dispatch = this.unconsumedMessages.DequeueNoWait(); if(dispatch != null) { - try - { - ActiveMQMessage message = CreateActiveMQMessage(dispatch); - BeforeMessageIsConsumed(dispatch); - listener(message); - AfterMessageIsConsumed(dispatch, false); - } - catch(NMSException e) - { - this.session.Connection.OnSessionException(this.session, e); - } - + this.Dispatch(dispatch); return true; } } Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQRedeliveryPolicyTest.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQRedeliveryPolicyTest.cs?rev=1087454&r1=1087453&r2=1087454&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQRedeliveryPolicyTest.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQRedeliveryPolicyTest.cs Thu Mar 31 21:59:56 2011 @@ -16,6 +16,7 @@ */ using System; +using System.Threading; using Apache.NMS.Test; using Apache.NMS.ActiveMQ.Commands; using NUnit.Framework; @@ -343,5 +344,104 @@ namespace Apache.NMS.ActiveMQ.Test } } + [Test] + public void TestNornalRedeliveryPolicyOnRollbackUntilTimeToLive() + { + using(Connection connection = (Connection) CreateConnection()) + { + IRedeliveryPolicy policy = connection.RedeliveryPolicy; + policy.MaximumRedeliveries = -1; + policy.InitialRedeliveryDelay = 500; + policy.UseExponentialBackOff = false; + + connection.Start(); + ISession session = connection.CreateSession(AcknowledgementMode.Transactional); + IDestination destination = session.CreateTemporaryQueue(); + + IMessageProducer producer = session.CreateProducer(destination); + IMessageConsumer consumer = session.CreateConsumer(destination); + + // Send the messages + ITextMessage textMessage = session.CreateTextMessage("1st"); + textMessage.NMSTimeToLive = TimeSpan.FromMilliseconds(800.0); + producer.Send(textMessage); + session.Commit(); + + ITextMessage m; + m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000)); + Assert.IsNotNull(m); + Assert.AreEqual("1st", m.Text); + session.Rollback(); + + // No delay on first Rollback.. + m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100)); + Assert.IsNotNull(m); + session.Rollback(); + + // Show subsequent re-delivery delay is incrementing. + m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100)); + Assert.IsNull(m); + m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(700)); + Assert.IsNotNull(m); + Assert.AreEqual("1st", m.Text); + session.Rollback(); + + // The message gets redelivered after 500 ms every time since + // we are not using exponential backoff. + m = (ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(700)); + Assert.IsNull(m); + } + } + + [Test] + public void TestNornalRedeliveryPolicyOnRollbackUntilTimeToLiveCallback() + { + using(Connection connection = (Connection) CreateConnection()) + { + IRedeliveryPolicy policy = connection.RedeliveryPolicy; + policy.MaximumRedeliveries = -1; + policy.InitialRedeliveryDelay = 500; + policy.UseExponentialBackOff = false; + + connection.Start(); + ISession session = connection.CreateSession(AcknowledgementMode.Transactional); + IDestination destination = session.CreateTemporaryQueue(); + + IMessageProducer producer = session.CreateProducer(destination); + IMessageConsumer consumer = session.CreateConsumer(destination); + CallbackClass cc = new CallbackClass(session); + consumer.Listener += new MessageListener(cc.consumer_Listener); + + // Send the messages + ITextMessage textMessage = session.CreateTextMessage("1st"); + textMessage.NMSTimeToLive = TimeSpan.FromMilliseconds(800.0); + producer.Send(textMessage, MsgDeliveryMode.Persistent,MsgPriority.Normal,TimeSpan.FromMilliseconds(800.0)); + session.Commit(); + + // sends normal message, then immediate retry, then retry after 500 ms, then expire. + Thread.Sleep(2000); + Assert.AreEqual(3, cc.numReceived); + } + } + + class CallbackClass + { + private ISession session; + public int numReceived = 0; + + public CallbackClass(ISession session) + { + this.session = session; + } + + public void consumer_Listener(IMessage message) + { + numReceived++; + ITextMessage m = message as ITextMessage; + Assert.IsNotNull(m); + Assert.AreEqual("1st", m.Text); + session.Rollback(); + } + } } }