activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1695615 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src: main/csharp/MessageConsumer.cs test/csharp/IndividualAckTest.cs
Date Wed, 12 Aug 2015 22:00:20 GMT
Author: tabish
Date: Wed Aug 12 22:00:20 2015
New Revision: 1695615

URL: http://svn.apache.org/r1695615
Log:
Ensure the unacknowledged message are rolled back from the duplicate tracker on consumer close.
Fixes [AMQNET-AMQNET-506]. (See https://issues.apache.org/jira/browse/AMQNET-AMQNET-506)

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/IndividualAckTest.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=1695615&r1=1695614&r2=1695615&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
Wed Aug 12 22:00:20 2015
@@ -507,7 +507,7 @@ namespace Apache.NMS.ActiveMQ
                     this.session.Scheduler.Cancel(this.optimizedAckTask);
                 }
 
-                if (this.session.IsClientAcknowledge)
+                if (this.session.IsClientAcknowledge || this.session.IsIndividualAcknowledge)
                 {
                     if (!this.info.Browser)
                     {

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/IndividualAckTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/IndividualAckTest.cs?rev=1695615&r1=1695614&r2=1695615&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/IndividualAckTest.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/IndividualAckTest.cs
Wed Aug 12 22:00:20 2015
@@ -260,5 +260,62 @@ namespace Apache.NMS.ActiveMQ.Test
             Assert.IsNull(msg);
             session.Close();
         }
+
+        [Test]
+        public void TestIndividualAcksWithClosedConsumerAndAuditSync()
+        {
+            const int MSG_COUNT = 20;
+            const string QUEUE_NAME = "TEST.TestIndividualAcksWithClosedConsumerAndAuditSync";
+
+            ProduceSomeMessages(MSG_COUNT, QUEUE_NAME);
+
+            string uri = "failover:(tcp://${activemqhost}:61616)";
+            IConnectionFactory factory = new ConnectionFactory(NMSTestSupport.ReplaceEnvVar(uri));
+
+            using (IConnection connection = factory.CreateConnection() as Connection)
+            using (ISession session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge))
+            using (IQueue queue = session.GetQueue(QUEUE_NAME))
+            {
+                connection.Start();
+
+                // Consume all messages with no ACK
+                using (IMessageConsumer consumer = session.CreateConsumer(queue))
+                {
+                    for (int i = 0; i < MSG_COUNT; ++i)
+                    {
+                        IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+                        Assert.NotNull(message);
+                        Tracer.DebugFormat("Received message: {0}", message.NMSMessageId);
+                    }
+                }
+
+                // Consumer the same batch again.
+                using (IMessageConsumer consumer = session.CreateConsumer(queue))
+                {
+                    for (int i = 0; i < MSG_COUNT; ++i)
+                    {
+                        IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+                        Assert.NotNull(message);
+                        Tracer.DebugFormat("Received message: {0}", message.NMSMessageId);
+                    }
+                }
+
+                session.DeleteDestination(queue);
+            }
+        }
+
+        private void ProduceSomeMessages(int count, string queueName)
+        {
+            using (IConnection connection = CreateConnection())
+            using (ISession session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge))
+            using (IQueue queue = session.GetQueue(queueName))
+            using (IMessageProducer producer = session.CreateProducer(queue))
+            {
+                for (int i = 0; i < count; ++i)
+                {
+                    producer.Send(session.CreateMessage());
+                }
+            }
+        }
     }
 }



Mime
View raw message