activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1201327 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src: main/csharp/MessageConsumer.cs test/csharp/MessageConsumerTest.cs
Date Sat, 12 Nov 2011 21:05:21 GMT
Author: tabish
Date: Sat Nov 12 21:05:21 2011
New Revision: 1201327

URL: http://svn.apache.org/viewvc?rev=1201327&view=rev
Log:
Fix for: https://issues.apache.org/jira/browse/AMQNET-335

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/MessageConsumerTest.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/MessageConsumer.cs?rev=1201327&r1=1201326&r2=1201327&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/MessageConsumer.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/MessageConsumer.cs
Sat Nov 12 21:05:21 2011
@@ -655,6 +655,7 @@ namespace Apache.NMS.ActiveMQ
 						if(IsAutoAcknowledgeBatch || IsAutoAcknowledgeEach || IsIndividualAcknowledge)
 						{
 							// Redeliver the message
+                            Rollback();
 						}
 						else
 						{

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/MessageConsumerTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/MessageConsumerTest.cs?rev=1201327&r1=1201326&r2=1201327&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/MessageConsumerTest.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/MessageConsumerTest.cs
Sat Nov 12 21:05:21 2011
@@ -1,178 +1,274 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-using System.Threading;
-using Apache.NMS.Test;
-using NUnit.Framework;
-using Apache.NMS.ActiveMQ.Commands;
-using System;
-using Apache.NMS.Util;
-
-namespace Apache.NMS.ActiveMQ.Test
-{
-    public enum ExpirationOptions
-    {
-        DEFAULT,
-        IGNORE,
-        DO_NOT_IGNORE
-    }
-
-    [TestFixture]
-    public class MessageConsumerTest : NMSTestSupport
-    {
-        protected static string DESTINATION_NAME = "queue://MessageConsumerTestDestination";
-        protected static string TEST_CLIENT_ID = "MessageConsumerTestClientId";
-
-        [Test]
-        public void ConsumeInTwoThreads()
-        {
-            ParameterizedThreadStart threadStart =
-                delegate(object o)
-                {
-                    IMessageConsumer consumer = (IMessageConsumer)o;
-                    IMessage message = consumer.Receive(TimeSpan.FromSeconds(2));
-                    Assert.IsNotNull(message);
-                };
-
-            using (IConnection connection = CreateConnection(TEST_CLIENT_ID))
-            {
-                connection.Start();
-                using (ISession session = connection.CreateSession(AcknowledgementMode.Transactional))
-                {
-                    IQueue queue = SessionUtil.GetDestination(session, DESTINATION_NAME)
as IQueue;
-
-                    // enqueue 2 messages
-                    using (IMessageConsumer consumer = session.CreateConsumer(queue))
-                    using (IMessageProducer producer = session.CreateProducer(queue))
-                    {
-                        producer.DeliveryMode = MsgDeliveryMode.Persistent;
-                        producer.Send(producer.CreateMessage());
-                        producer.Send(producer.CreateMessage());
-                        session.Commit();
-
-                        // receive first using a dedicated thread. This works
-                        Thread thread = new Thread(threadStart);
-                        thread.Start(consumer);
-                        thread.Join();
-                        session.Commit();
-
-                        // receive second using main thread. This FAILS
-                        IMessage message = consumer.Receive(TimeSpan.FromSeconds(2)); //
throws System.Threading.AbandonedMutexException
-                        Assert.IsNotNull(message);
-                        session.Commit();
-                    }
-                }
-            }
-        }

-        [Test]
-        public void TestReceiveIgnoreExpirationMessage(
-            [Values(AcknowledgementMode.AutoAcknowledge, AcknowledgementMode.ClientAcknowledge,
-                AcknowledgementMode.DupsOkAcknowledge, AcknowledgementMode.Transactional)]
-            AcknowledgementMode ackMode,
-            [Values(MsgDeliveryMode.NonPersistent, MsgDeliveryMode.Persistent)]
-            MsgDeliveryMode deliveryMode,
-            [Values(ExpirationOptions.DEFAULT, ExpirationOptions.IGNORE, ExpirationOptions.DO_NOT_IGNORE)]
-            ExpirationOptions expirationOption)
-        {
-            using(IConnection connection = CreateConnection(TEST_CLIENT_ID))
-            {
-                connection.Start();
-                using(Session session = connection.CreateSession(ackMode) as Session)
-                {
-                    string destinationName = DESTINATION_NAME;
-
-                    if(ExpirationOptions.IGNORE == expirationOption)
-                    {
-                        destinationName += "?consumer.nms.ignoreExpiration=true";
-                    }
-                    else if(ExpirationOptions.DO_NOT_IGNORE == expirationOption)
-                    {
-                        destinationName += "?consumer.nms.ignoreExpiration=false";
-                    }
-
-                    try
-                    {
-                        IDestination destination = SessionUtil.GetDestination(session, destinationName);
-
-                        using(IMessageConsumer consumer = session.CreateConsumer(destination))
-                        using(IMessageProducer producer = session.CreateProducer(destination))
-                        {
-                            producer.DeliveryMode = deliveryMode;
-
-                            string msgText = "ExpiredMessage:" + Guid.NewGuid().ToString();
-
-                            ActiveMQTextMessage msg = session.CreateTextMessage(msgText)
as ActiveMQTextMessage;
-
-                            // Give it two seconds to live.
-                            msg.NMSTimeToLive = TimeSpan.FromMilliseconds(2000);
-
-                            producer.Send(msg);
-
-                            if(AcknowledgementMode.Transactional == ackMode)
-                            {
-                                session.Commit();
-                            }
-
-                            // Wait for four seconds before processing it.  The broker will
have sent it to our local
-                            // client dispatch queue, but we won't attempt to process the
message until it has had
-                            // a chance to expire within our internal queue system.
-                            Thread.Sleep(4000);
-
-                            ActiveMQTextMessage rcvMsg = consumer.ReceiveNoWait() as ActiveMQTextMessage;
-
-                            if(ExpirationOptions.IGNORE == expirationOption)
-                            {
-                                Assert.IsNotNull(rcvMsg, "Did not receive expired message.");
-                                rcvMsg.Acknowledge();
-
-                                Assert.AreEqual(msgText, rcvMsg.Text, "Message text does
not match.");
-                                Assert.IsTrue(rcvMsg.IsExpired());
-
-                                if(AcknowledgementMode.Transactional == ackMode)
-                                {
-                                    session.Commit();
-                                }
-                            }
-                            else
-                            {
-                                // Should not receive a message.
-                                Assert.IsNull(rcvMsg, "Received an expired message!");
-                            }
-
-                            consumer.Close();
-                            producer.Close();
-                        }
-                    }
-                    finally
-                    {
-                        try
-                        {
-                            // Ensure that Session resources on the Broker release transacted
Consumers.
-                            session.Close();
-                            // Give the Broker some time to remove the subscriptions.
-                            Thread.Sleep(2000);
-                            SessionUtil.DeleteDestination(session, destinationName);
-                        }
-                        catch
-                        {
-                        }
-                    }
-                }
-            }
-        }
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Threading;
+using Apache.NMS.Test;
+using NUnit.Framework;
+using Apache.NMS.ActiveMQ.Commands;
+using System;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.ActiveMQ.Test
+{
+    public enum ExpirationOptions
+    {
+        DEFAULT,
+        IGNORE,
+        DO_NOT_IGNORE
+    }
+
+    [TestFixture]
+    public class MessageConsumerTest : NMSTestSupport
+    {
+        protected static string DESTINATION_NAME = "queue://MessageConsumerTestDestination";
+        protected static string TEST_CLIENT_ID = "MessageConsumerTestClientId";
+
+        private CountDownLatch doneLatch;
+        private int counter;
+        private String errorMessage;
+
+        [SetUp]
+        public override void SetUp()
+        {
+            base.SetUp();
+
+            this.doneLatch = new CountDownLatch(1);
+            this.counter = 0;
+            this.errorMessage = null;
+        }
+
+        [Test]
+        public void TestAsyncDispatchExceptionRedelivers()
+        {
+            using (IConnection connection = CreateConnection(TEST_CLIENT_ID))
+            {
+                using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+                {
+                    IQueue queue = SessionUtil.GetDestination(session, DESTINATION_NAME)
as IQueue;
+
+                    using (IMessageProducer producer = session.CreateProducer(queue))
+                    {
+                        producer.DeliveryMode = MsgDeliveryMode.NonPersistent;
+                        producer.Send(producer.CreateTextMessage("First"));
+                        producer.Send(producer.CreateTextMessage("Second"));
+                    }
+
+                    using (IMessageConsumer consumer = session.CreateConsumer(queue))
+                    {
+                        consumer.Listener += OnTestAsynchRedliversMessage;
+
+                        connection.Start();
+
+                        if (doneLatch.await(TimeSpan.FromSeconds(10)))
+                        {
+                            if (!String.IsNullOrEmpty(errorMessage))
+                            {
+                                Assert.Fail(errorMessage);
+                            }
+                        }
+                        else
+                        {
+                            Assert.Fail("Timeout waiting for async message delivery to complete.");
+                        }
+                    }
+                }
+            }
+        }
+
+        private void OnTestAsynchRedliversMessage(IMessage msg)
+        {
+            counter++;
+            try
+            {
+                ITextMessage message = msg as ITextMessage;
+                switch (counter)
+                {
+                case 1:
+                    Tracer.Debug("Got first Message: " + message.Text);
+                    Assert.AreEqual("First", message.Text);
+                    Assert.IsFalse(message.NMSRedelivered);
+                    break;
+                case 2:
+                    Tracer.Debug("Got Second Message: " + message.Text);
+                    Assert.AreEqual("Second", message.Text);
+                    Assert.IsFalse(message.NMSRedelivered);
+                    throw new Exception("Ignore Me");
+                case 3:
+                    Tracer.Debug("Got Third Message: " + message.Text);
+                    Assert.AreEqual("Second", message.Text);
+                    Assert.IsTrue(message.NMSRedelivered);
+                    doneLatch.countDown();
+                    break;
+                default:
+                    errorMessage = "Got too many messages: " + counter;
+                    Tracer.Debug(errorMessage);
+                    doneLatch.countDown();
+                    break;
+                }
+            }
+            catch (Exception e)
+            {
+                if (e.Message.Equals("Ignore Me"))
+                {
+                    throw;
+                }
+                errorMessage = "Got exception: " + e.Message;
+                Tracer.Warn("Exception on Message Receive: " + e.Message);
+                doneLatch.countDown();
+            }
+        }
+
+        [Test]
+        public void ConsumeInTwoThreads()
+        {
+            ParameterizedThreadStart threadStart =
+                delegate(object o)
+                {
+                    IMessageConsumer consumer = (IMessageConsumer)o;
+                    IMessage message = consumer.Receive(TimeSpan.FromSeconds(2));
+                    Assert.IsNotNull(message);
+                };
+
+            using (IConnection connection = CreateConnection(TEST_CLIENT_ID))
+            {
+                connection.Start();
+                using (ISession session = connection.CreateSession(AcknowledgementMode.Transactional))
+                {
+                    IQueue queue = SessionUtil.GetDestination(session, DESTINATION_NAME)
as IQueue;
+
+                    // enqueue 2 messages
+                    using (IMessageConsumer consumer = session.CreateConsumer(queue))
+                    using (IMessageProducer producer = session.CreateProducer(queue))
+                    {
+                        producer.DeliveryMode = MsgDeliveryMode.Persistent;
+                        producer.Send(producer.CreateMessage());
+                        producer.Send(producer.CreateMessage());
+                        session.Commit();
+
+                        // receive first using a dedicated thread. This works
+                        Thread thread = new Thread(threadStart);
+                        thread.Start(consumer);
+                        thread.Join();
+                        session.Commit();
+
+                        // receive second using main thread. This FAILS
+                        IMessage message = consumer.Receive(TimeSpan.FromSeconds(2)); //
throws System.Threading.AbandonedMutexException
+                        Assert.IsNotNull(message);
+                        session.Commit();
+                    }
+                }
+            }
+        }
+
+        [Test]
+        public void TestReceiveIgnoreExpirationMessage(
+            [Values(AcknowledgementMode.AutoAcknowledge, AcknowledgementMode.ClientAcknowledge,
+                AcknowledgementMode.DupsOkAcknowledge, AcknowledgementMode.Transactional)]
+            AcknowledgementMode ackMode,
+            [Values(MsgDeliveryMode.NonPersistent, MsgDeliveryMode.Persistent)]
+            MsgDeliveryMode deliveryMode,
+            [Values(ExpirationOptions.DEFAULT, ExpirationOptions.IGNORE, ExpirationOptions.DO_NOT_IGNORE)]
+            ExpirationOptions expirationOption)
+        {
+            using(IConnection connection = CreateConnection(TEST_CLIENT_ID))
+            {
+                connection.Start();
+                using(Session session = connection.CreateSession(ackMode) as Session)
+                {
+                    string destinationName = DESTINATION_NAME;
+
+                    if(ExpirationOptions.IGNORE == expirationOption)
+                    {
+                        destinationName += "?consumer.nms.ignoreExpiration=true";
+                    }
+                    else if(ExpirationOptions.DO_NOT_IGNORE == expirationOption)
+                    {
+                        destinationName += "?consumer.nms.ignoreExpiration=false";
+                    }
+
+                    try
+                    {
+                        IDestination destination = SessionUtil.GetDestination(session, destinationName);
+
+                        using(IMessageConsumer consumer = session.CreateConsumer(destination))
+                        using(IMessageProducer producer = session.CreateProducer(destination))
+                        {
+                            producer.DeliveryMode = deliveryMode;
+
+                            string msgText = "ExpiredMessage:" + Guid.NewGuid().ToString();
+
+                            ActiveMQTextMessage msg = session.CreateTextMessage(msgText)
as ActiveMQTextMessage;
+
+                            // Give it two seconds to live.
+                            msg.NMSTimeToLive = TimeSpan.FromMilliseconds(2000);
+
+                            producer.Send(msg);
+
+                            if(AcknowledgementMode.Transactional == ackMode)
+                            {
+                                session.Commit();
+                            }
+
+                            // Wait for four seconds before processing it.  The broker will
have sent it to our local
+                            // client dispatch queue, but we won't attempt to process the
message until it has had
+                            // a chance to expire within our internal queue system.
+                            Thread.Sleep(4000);
+
+                            ActiveMQTextMessage rcvMsg = consumer.ReceiveNoWait() as ActiveMQTextMessage;
+
+                            if(ExpirationOptions.IGNORE == expirationOption)
+                            {
+                                Assert.IsNotNull(rcvMsg, "Did not receive expired message.");
+                                rcvMsg.Acknowledge();
+
+                                Assert.AreEqual(msgText, rcvMsg.Text, "Message text does
not match.");
+                                Assert.IsTrue(rcvMsg.IsExpired());
+
+                                if(AcknowledgementMode.Transactional == ackMode)
+                                {
+                                    session.Commit();
+                                }
+                            }
+                            else
+                            {
+                                // Should not receive a message.
+                                Assert.IsNull(rcvMsg, "Received an expired message!");
+                            }
+
+                            consumer.Close();
+                            producer.Close();
+                        }
+                    }
+                    finally
+                    {
+                        try
+                        {
+                            // Ensure that Session resources on the Broker release transacted
Consumers.
+                            session.Close();
+                            // Give the Broker some time to remove the subscriptions.
+                            Thread.Sleep(2000);
+                            SessionUtil.DeleteDestination(session, destinationName);
+                        }
+                        catch
+                        {
+                        }
+                    }
+                }
+            }
+        }
+    }
+}



Mime
View raw message