activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1695745 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x: ./ src/main/csharp/MessageConsumer.cs src/test/csharp/IndividualAckTest.cs src/test/csharp/QueueBrowserTests.cs src/test/csharp/ZeroPrefetchConsumerTest.cs
Date Thu, 13 Aug 2015 18:01:06 GMT
Author: tabish
Date: Thu Aug 13 18:01:06 2015
New Revision: 1695745

URL: http://svn.apache.org/r1695745
Log:
Merge fixes for 

AMQNET-505
AMQNET-506
AMQNET-507
AMQNET-508

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/   (props changed)
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/src/test/csharp/IndividualAckTest.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/src/test/csharp/QueueBrowserTests.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/src/test/csharp/ZeroPrefetchConsumerTest.cs

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Aug 13 18:01:06 2015
@@ -1,4 +1,4 @@
 /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x:1082291,1135831,1137081,1171843,1171874,1177390,1177395,1186568,1187123,1238881,1293360,1294890,1295257,1311395,1312026,1374469,1375295,1376782
 /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/tags/1.0.0:692591,693525
 /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/tags/1.1.0:788230,788233,790183
-/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk:1689517
+/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk:1689517,1695609-1695737

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/src/main/csharp/MessageConsumer.cs?rev=1695745&r1=1695744&r2=1695745&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/src/main/csharp/MessageConsumer.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/src/main/csharp/MessageConsumer.cs
Thu Aug 13 18:01:06 2015
@@ -507,7 +507,7 @@ namespace Apache.NMS.ActiveMQ
                     this.session.Scheduler.Cancel(this.optimizedAckTask);
                 }
 
-                if (this.session.IsClientAcknowledge)
+                if (this.session.IsClientAcknowledge || this.session.IsIndividualAcknowledge)
                 {
                     if (!this.info.Browser)
                     {
@@ -1035,7 +1035,7 @@ namespace Apache.NMS.ActiveMQ
                 {
                     return null;
                 }
-                else if(!IgnoreExpiration && dispatch.Message.IsExpired())
+                else if(ConsumeExpiredMessage(dispatch))
                 {
                     Tracer.DebugFormat("Consumer[{0}] received expired message: {1}",
                                        ConsumerId, dispatch.Message.MessageId);
@@ -1058,6 +1058,8 @@ namespace Apache.NMS.ActiveMQ
                             timeout = deadline - dispatchTime;
                         }
                     }
+
+                    SendPullRequest((long) timeout.TotalMilliseconds);
                 }
                 else if (RedeliveryExceeded(dispatch))
                 {
@@ -1065,6 +1067,25 @@ namespace Apache.NMS.ActiveMQ
                                        ConsumerId, dispatch);
                     PosionAck(dispatch, "dispatch to " + ConsumerId + " exceeds redelivery
" +
                                         "policy limit:" + redeliveryPolicy.MaximumRedeliveries);
+
+                    // Refresh the dispatch time
+                    dispatchTime = DateTime.Now;
+
+                    if(timeout > TimeSpan.Zero && !this.unconsumedMessages.Closed)
+                    {
+                        if(dispatchTime > deadline)
+                        {
+                            // Out of time.
+                            timeout = TimeSpan.Zero;
+                        }
+                        else
+                        {
+                            // Adjust the timeout to the remaining time.
+                            timeout = deadline - dispatchTime;
+                        }
+                    }
+
+                    SendPullRequest((long) timeout.TotalMilliseconds);
                 }
                 else
                 {
@@ -1073,6 +1094,16 @@ namespace Apache.NMS.ActiveMQ
             }
         }
 
+        private bool ConsumeExpiredMessage(MessageDispatch dispatch) 
+        {
+            if (dispatch.Message.IsExpired()) 
+            {
+                return !info.Browser && !IgnoreExpiration;
+            }
+
+            return false;
+        }
+
         public virtual void BeforeMessageIsConsumed(MessageDispatch dispatch)
         {
             dispatch.DeliverySequenceId = session.NextDeliveryId;

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/src/test/csharp/IndividualAckTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/src/test/csharp/IndividualAckTest.cs?rev=1695745&r1=1695744&r2=1695745&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/src/test/csharp/IndividualAckTest.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/src/test/csharp/IndividualAckTest.cs
Thu Aug 13 18:01:06 2015
@@ -260,5 +260,62 @@ namespace Apache.NMS.ActiveMQ.Test
             Assert.IsNull(msg);
             session.Close();
         }
+
+        [Test]
+        public void TestIndividualAcksWithClosedConsumerAndAuditSync()
+        {
+            const int MSG_COUNT = 20;
+            const string QUEUE_NAME = "TEST.TestIndividualAcksWithClosedConsumerAndAuditSync";
+
+            ProduceSomeMessages(MSG_COUNT, QUEUE_NAME);
+
+            string uri = "failover:(tcp://${activemqhost}:61616)";
+            IConnectionFactory factory = new ConnectionFactory(NMSTestSupport.ReplaceEnvVar(uri));
+
+            using (IConnection connection = factory.CreateConnection() as Connection)
+            using (ISession session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge))
+            using (IQueue queue = session.GetQueue(QUEUE_NAME))
+            {
+                connection.Start();
+
+                // Consume all messages with no ACK
+                using (IMessageConsumer consumer = session.CreateConsumer(queue))
+                {
+                    for (int i = 0; i < MSG_COUNT; ++i)
+                    {
+                        IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+                        Assert.NotNull(message);
+                        Tracer.DebugFormat("Received message: {0}", message.NMSMessageId);
+                    }
+                }
+
+                // Consumer the same batch again.
+                using (IMessageConsumer consumer = session.CreateConsumer(queue))
+                {
+                    for (int i = 0; i < MSG_COUNT; ++i)
+                    {
+                        IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+                        Assert.NotNull(message);
+                        Tracer.DebugFormat("Received message: {0}", message.NMSMessageId);
+                    }
+                }
+
+                session.DeleteDestination(queue);
+            }
+        }
+
+        private void ProduceSomeMessages(int count, string queueName)
+        {
+            using (IConnection connection = CreateConnection())
+            using (ISession session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge))
+            using (IQueue queue = session.GetQueue(queueName))
+            using (IMessageProducer producer = session.CreateProducer(queue))
+            {
+                for (int i = 0; i < count; ++i)
+                {
+                    producer.Send(session.CreateMessage());
+                }
+            }
+        }
     }
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/src/test/csharp/QueueBrowserTests.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/src/test/csharp/QueueBrowserTests.cs?rev=1695745&r1=1695744&r2=1695745&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/src/test/csharp/QueueBrowserTests.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/src/test/csharp/QueueBrowserTests.cs
Thu Aug 13 18:01:06 2015
@@ -205,6 +205,80 @@ namespace Apache.NMS.ActiveMQ.Test
                 IQueueBrowser browser = session.CreateBrowser(queue);
 				browser.Close();
             }
-        }		
+        }
+
+        [Test]
+        public void TestBrowsingExpiration()
+        {
+            const int MESSAGES_TO_SEND = 50;
+            const string QUEUE_NAME = "TEST.TestBrowsingExpiration";
+
+            // Browse the queue.
+            using (Connection connection = CreateConnection() as Connection)
+            using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+            using (IQueue queue = session.GetQueue(QUEUE_NAME))
+            {
+                session.DeleteDestination(queue);
+
+                SendTestMessages(MESSAGES_TO_SEND, QUEUE_NAME);
+
+                connection.Start();
+                int browsed = Browse(QUEUE_NAME, connection);
+
+                // The number of messages browsed should be equal to the number of
+                // messages sent.
+                Assert.AreEqual(MESSAGES_TO_SEND, browsed);
+
+                // Broker expired message period is 30 seconds by default
+                for (int i = 0; i < 12; ++i)
+                {
+                    Thread.Sleep(5000);
+                    browsed = Browse(QUEUE_NAME, connection);
+                }
+
+                session.DeleteDestination(session.GetQueue(QUEUE_NAME));
+
+                Assert.AreEqual(0, browsed);
+            }
+        }
+
+        private int Browse(String queueName, Connection connection)
+        {
+            int browsed = 0;
+
+            using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+            using (IQueue queue = session.GetQueue(queueName))
+            using (IQueueBrowser browser = session.CreateBrowser(queue))
+            {
+                IEnumerator enumeration = browser.GetEnumerator();
+                while (enumeration.MoveNext())
+                {
+                    ITextMessage message = enumeration.Current as ITextMessage;
+                    Tracer.DebugFormat("Browsed message: {0}", message.NMSMessageId);
+                    browsed++;
+                }
+            }
+
+            return browsed;
+        }
+
+        protected void SendTestMessages(int count, String queueName)
+        {
+            // Send the messages to the Queue.
+            using (Connection connection = CreateConnection() as Connection)
+            using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+            using (IQueue queue = session.GetQueue(queueName))
+            using (IMessageProducer producer = session.CreateProducer(queue))
+            {
+                for (int i = 1; i <= count; i++) 
+                {
+                    String msgStr = "Message: " + i;
+                    producer.Send(session.CreateTextMessage(msgStr), 
+                                  MsgDeliveryMode.NonPersistent, 
+                                  MsgPriority.Normal, 
+                                  TimeSpan.FromMilliseconds(1500));
+                }
+            }
+        }
 	}
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/src/test/csharp/ZeroPrefetchConsumerTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/src/test/csharp/ZeroPrefetchConsumerTest.cs?rev=1695745&r1=1695744&r2=1695745&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/src/test/csharp/ZeroPrefetchConsumerTest.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.7.x/src/test/csharp/ZeroPrefetchConsumerTest.cs
Thu Aug 13 18:01:06 2015
@@ -154,6 +154,54 @@ namespace Apache.NMS.ActiveMQ.Test
             Assert.IsNull(answer, "Should have not received a message!");
         }
 
+        [Test]
+        public void TestConsumerReceivePrefetchZeroRedeliveryZero()
+        {
+            const string QUEUE_NAME = "TEST.TestConsumerReceivePrefetchZeroRedeliveryZero";
+
+            using (Connection connection = CreateConnection() as Connection)
+            using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+            using (IQueue queue = session.GetQueue(QUEUE_NAME))
+            {              
+                session.DeleteDestination(queue);
+
+                using (IMessageProducer producer = session.CreateProducer(queue))
+                {
+                    ITextMessage textMessage = session.CreateTextMessage("test Message");
+                    producer.Send(textMessage);
+                }
+            }
+
+            // consume and rollback - increase redelivery counter on message
+            using (Connection connection = CreateConnection() as Connection)
+            using (ISession session = connection.CreateSession(AcknowledgementMode.Transactional))
+            using (IQueue queue = session.GetQueue(QUEUE_NAME))
+            using (IMessageConsumer consumer = session.CreateConsumer(queue))
+            {              
+                connection.Start();
+                IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+                Assert.IsNotNull(message);
+                session.Rollback();
+            }
+
+            // try consume with timeout - expect it to timeout and return NULL message
+            using (Connection connection = CreateConnection() as Connection)
+            {
+                connection.PrefetchPolicy.All = 0;
+                connection.RedeliveryPolicy.MaximumRedeliveries = 0;
+                connection.Start();
+
+                ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+                IQueue queue = session.GetQueue(QUEUE_NAME);
+
+                using (IMessageConsumer consumer = session.CreateConsumer(queue))
+                {
+                    IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(3000));
+                    Assert.IsNull(message);
+                }
+            }
+        }
+
         [SetUp]
         public override void SetUp()
         {



Mime
View raw message