activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r896478 - in /activemq/activemq-dotnet/Apache.NMS.Stomp/trunk: src/test/csharp/ConsumerTest.cs vs2008-stomp-test.csproj
Date Wed, 06 Jan 2010 15:29:50 GMT
Author: tabish
Date: Wed Jan  6 15:29:50 2010
New Revision: 896478

URL: http://svn.apache.org/viewvc?rev=896478&view=rev
Log:
Add in a version of the ConsumerTest from NMS.

Added:
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/ConsumerTest.cs   (with
props)
Modified:
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj

Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/ConsumerTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/ConsumerTest.cs?rev=896478&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/ConsumerTest.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/ConsumerTest.cs Wed Jan
 6 15:29:50 2010
@@ -0,0 +1,520 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+using Apache.NMS;
+using Apache.NMS.Util;
+using Apache.NMS.Test;
+using NUnit.Framework;
+using NUnit.Framework.Extensions;
+
+namespace Apache.NMS.Stomp.Test
+{
+    [TestFixture]
+    public class ConsumerTest : NMSTestSupport
+    {
+        protected static string TEST_CLIENT_ID = "TestConsumerClientId";
+        protected const int COUNT = 25;
+        protected const string VALUE_NAME = "value";
+
+        private bool dontAck;
+
+// The .NET CF does not have the ability to interrupt threads, so this test is impossible.
+#if !NETCF
+        [RowTest]
+        [Row(AcknowledgementMode.AutoAcknowledge)]
+        [Row(AcknowledgementMode.ClientAcknowledge)]
+        [Row(AcknowledgementMode.DupsOkAcknowledge)]
+        [Row(AcknowledgementMode.Transactional)]
+        public void TestNoTimeoutConsumer(AcknowledgementMode ackMode)
+        {
+            // Launch a thread to perform IMessageConsumer.Receive().
+            // If it doesn't fail in less than three seconds, no exception was thrown.
+            Thread receiveThread = new Thread(new ThreadStart(TimeoutConsumerThreadProc));
+            using(IConnection connection = CreateConnection(TEST_CLIENT_ID + ":" + new Random().Next()))
+            {
+                connection.Start();
+                using(ISession session = connection.CreateSession(ackMode))
+                {
+                    ITemporaryQueue queue = session.CreateTemporaryQueue();
+                    using(this.timeoutConsumer = session.CreateConsumer(queue))
+                    {
+                        receiveThread.Start();
+                        if(receiveThread.Join(3000))
+                        {
+                            Assert.Fail("IMessageConsumer.Receive() returned without blocking.
 Test failed.");
+                        }
+                        else
+                        {
+                            // Kill the thread - otherwise it'll sit in Receive() until a
message arrives.
+                            receiveThread.Interrupt();
+                        }
+                    }
+                }
+            }
+        }
+
+        protected IMessageConsumer timeoutConsumer;
+
+        protected void TimeoutConsumerThreadProc()
+        {
+            try
+            {
+                timeoutConsumer.Receive();
+            }
+            catch(ArgumentOutOfRangeException e)
+            {
+                // The test failed.  We will know because the timeout will expire inside
TestNoTimeoutConsumer().
+                Assert.Fail("Test failed with exception: " + e.Message);
+            }
+            catch(ThreadInterruptedException)
+            {
+                // The test succeeded!  We were still blocked when we were interrupted.
+            }
+            catch(Exception e)
+            {
+                // Some other exception occurred.
+                Assert.Fail("Test failed with exception: " + e.Message);
+            }
+        }
+
+        [RowTest]
+        [Row(AcknowledgementMode.AutoAcknowledge)]
+        [Row(AcknowledgementMode.ClientAcknowledge)]
+        [Row(AcknowledgementMode.DupsOkAcknowledge)]
+        [Row(AcknowledgementMode.Transactional)]
+        public void TestSyncReceiveConsumerClose(AcknowledgementMode ackMode)
+        {
+            // Launch a thread to perform IMessageConsumer.Receive().
+            // If it doesn't fail in less than three seconds, no exception was thrown.
+            Thread receiveThread = new Thread(new ThreadStart(TimeoutConsumerThreadProc));
+            using (IConnection connection = CreateConnection(TEST_CLIENT_ID + ":" + new Random().Next()))
+            {
+                connection.Start();
+                using (ISession session = connection.CreateSession(ackMode))
+                {
+                    ITemporaryQueue queue = session.CreateTemporaryQueue();
+                    using (this.timeoutConsumer = session.CreateConsumer(queue))
+                    {
+                        receiveThread.Start();
+                        if (receiveThread.Join(3000))
+                        {
+                            Assert.Fail("IMessageConsumer.Receive() returned without blocking.
 Test failed.");
+                        }
+                        else
+                        {
+                            // Kill the thread - otherwise it'll sit in Receive() until a
message arrives.
+                            this.timeoutConsumer.Close();
+                            receiveThread.Join(10000);
+                            if (receiveThread.IsAlive)
+                            {
+                                // Kill the thread - otherwise it'll sit in Receive() until
a message arrives.
+                                receiveThread.Interrupt();
+                                Assert.Fail("IMessageConsumer.Receive() thread is still alive,
Close should have killed it.");
+                            }
+                        }
+                    }
+                }
+            }
+        }
+
+        internal class ThreadArg
+        {
+            internal IConnection connection = null;
+            internal ISession session = null;
+            internal IDestination destination = null;
+        }
+
+        protected void DelayedProducerThreadProc(Object arg)
+        {
+            try
+            {
+                ThreadArg args = arg as ThreadArg;
+
+                using(ISession session = args.connection.CreateSession())
+                {
+                    using(IMessageProducer producer = session.CreateProducer(args.destination))
+                    {
+                        // Give the consumer time to enter the receive.
+                        Thread.Sleep(5000);
+
+                        producer.Send(args.session.CreateTextMessage("Hello World"));
+                    }
+                }
+            }
+            catch(Exception e)
+            {
+                // Some other exception occurred.
+                Assert.Fail("Test failed with exception: " + e.Message);
+            }
+        }
+
+        [RowTest]
+        [Row(AcknowledgementMode.AutoAcknowledge)]
+        [Row(AcknowledgementMode.ClientAcknowledge)]
+        [Row(AcknowledgementMode.DupsOkAcknowledge)]
+        [Row(AcknowledgementMode.Transactional)]
+        public void TestDoChangeSentMessage(AcknowledgementMode ackMode)
+        {
+            using(IConnection connection = CreateConnection(TEST_CLIENT_ID + ":" + new Random().Next()))
+            {
+                connection.Start();
+                using(ISession session = connection.CreateSession(ackMode))
+                {
+                    ITemporaryQueue queue = session.CreateTemporaryQueue();
+                    using(IMessageConsumer consumer = session.CreateConsumer(queue))
+                    {
+                        IMessageProducer producer = session.CreateProducer(queue);
+                        ITextMessage message = session.CreateTextMessage();
+
+                        string prefix = "ConsumerTest - TestDoChangeSentMessage: ";
+
+                        for(int i = 0; i < COUNT; i++)
+                        {
+                            message.Properties[VALUE_NAME] = i;
+                            message.Text = prefix + Convert.ToString(i);
+
+                            producer.Send(message);
+
+                            message.ClearBody();
+                            message.ClearProperties();
+                        }
+
+                        if(ackMode == AcknowledgementMode.Transactional)
+                        {
+                            session.Commit();
+                        }
+
+                        for(int i = 0; i < COUNT; i++)
+                        {
+                            ITextMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(2000))
as ITextMessage;
+                            Assert.AreEqual(msg.Text, prefix + Convert.ToString(i));
+                            Assert.AreEqual(msg.Properties.GetInt(VALUE_NAME), i);
+                        }
+
+                        if(ackMode == AcknowledgementMode.Transactional)
+                        {
+                            session.Commit();
+                        }
+
+                    }
+                }
+            }
+        }
+
+        [RowTest]
+        [Row(AcknowledgementMode.AutoAcknowledge)]
+        [Row(AcknowledgementMode.ClientAcknowledge)]
+        [Row(AcknowledgementMode.DupsOkAcknowledge)]
+        [Row(AcknowledgementMode.Transactional)]
+        public void TestConsumerReceiveBeforeMessageDispatched(AcknowledgementMode ackMode)
+        {
+            // Launch a thread to perform a delayed send.
+            Thread sendThread = new Thread(DelayedProducerThreadProc);
+            using(IConnection connection = CreateConnection(TEST_CLIENT_ID + ":" + new Random().Next()))
+            {
+                connection.Start();
+                using(ISession session = connection.CreateSession(ackMode))
+                {
+                    ITemporaryQueue queue = session.CreateTemporaryQueue();
+                    using(IMessageConsumer consumer = session.CreateConsumer(queue))
+                    {
+                        ThreadArg arg = new ThreadArg();
+
+                        arg.connection = connection;
+                        arg.session = session;
+                        arg.destination = queue;
+
+                        sendThread.Start(arg);
+                        IMessage message = consumer.Receive(TimeSpan.FromMinutes(1));
+                        Assert.IsNotNull(message);
+                    }
+                }
+            }
+        }
+
+        [RowTest]
+        [Row(MsgDeliveryMode.NonPersistent, DestinationType.Queue)]
+        [Row(MsgDeliveryMode.NonPersistent, DestinationType.Topic)]
+        public void TestDontStart(MsgDeliveryMode deliveryMode, DestinationType destinationType
)
+        {
+            using(IConnection connection = CreateConnection(TEST_CLIENT_ID + ":" + new Random().Next()))
+            {
+                ISession session = connection.CreateSession();
+                IDestination destination = CreateDestination(session, destinationType);
+                IMessageConsumer consumer = session.CreateConsumer(destination);
+
+                // Send the messages
+                SendMessages(session, destination, deliveryMode, 1);
+
+                // Make sure no messages were delivered.
+                Assert.IsNull(consumer.Receive(TimeSpan.FromMilliseconds(1000)));
+            }
+        }
+
+        [RowTest]
+        [Row(MsgDeliveryMode.NonPersistent, DestinationType.Queue)]
+        [Row(MsgDeliveryMode.Persistent, DestinationType.Queue)]
+        [Row(MsgDeliveryMode.NonPersistent, DestinationType.Topic)]
+        [Row(MsgDeliveryMode.Persistent, DestinationType.Topic)]
+        [Row(MsgDeliveryMode.NonPersistent, DestinationType.TemporaryQueue)]
+        [Row(MsgDeliveryMode.Persistent, DestinationType.TemporaryQueue)]
+        [Row(MsgDeliveryMode.NonPersistent, DestinationType.TemporaryTopic)]
+        [Row(MsgDeliveryMode.Persistent, DestinationType.TemporaryTopic)]
+        public void TestSendReceiveTransacted(MsgDeliveryMode deliveryMode, DestinationType
destinationType)
+        {
+            using(IConnection connection = CreateConnection(TEST_CLIENT_ID + ":" + new Random().Next()))
+            {
+                // Send a message to the broker.
+                connection.Start();
+                ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+                IDestination destination = CreateDestination(session, destinationType);
+                IMessageConsumer consumer = session.CreateConsumer(destination);
+                IMessageProducer producer = session.CreateProducer(destination);
+
+                producer.DeliveryMode = deliveryMode;
+                producer.Send(session.CreateTextMessage("Test"));
+
+                // Message should not be delivered until commit.
+                Thread.Sleep(1000);
+                Assert.IsNull(consumer.ReceiveNoWait());
+                session.Commit();
+
+                // Make sure only 1 message was delivered.
+                IMessage message = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+                Assert.IsNotNull(message);
+                Assert.IsFalse(message.NMSRedelivered);
+                Assert.IsNull(consumer.ReceiveNoWait());
+
+                // Message should be redelivered is rollback is used.
+                session.Rollback();
+
+                // Make sure only 1 message was delivered.
+                message = consumer.Receive(TimeSpan.FromMilliseconds(2000));
+                Assert.IsNotNull(message);
+                Assert.IsTrue(message.NMSRedelivered);
+                Assert.IsNull(consumer.ReceiveNoWait());
+
+                // If we commit now, the message should not be redelivered.
+                session.Commit();
+                Thread.Sleep(1000);
+                Assert.IsNull(consumer.ReceiveNoWait());
+            }
+        }
+
+        [Test]
+        public void TestAckedMessageAreConsumed()
+        {
+            using(IConnection connection = CreateConnection(TEST_CLIENT_ID + ":" + new Random().Next()))
+            {
+                connection.Start();
+                ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+                IQueue queue = session.GetQueue(Guid.NewGuid().ToString());
+                IMessageProducer producer = session.CreateProducer(queue);
+                producer.Send(session.CreateTextMessage("Hello"));
+
+                // Consume the message...
+                IMessageConsumer consumer = session.CreateConsumer(queue);
+                IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+                Assert.IsNotNull(msg);
+                msg.Acknowledge();
+
+                // Reset the session.
+                session.Close();
+                session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+
+                // Attempt to Consume the message...
+                consumer = session.CreateConsumer(queue);
+                msg = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+                Assert.IsNull(msg);
+
+                session.Close();
+            }
+        }
+
+        [Test]
+        public void TestLastMessageAcked()
+        {
+            using(IConnection connection = CreateConnection(TEST_CLIENT_ID + ":" + new Random().Next()))
+            {
+                connection.Start();
+                ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+                IQueue queue = session.GetQueue(Guid.NewGuid().ToString());
+                IMessageProducer producer = session.CreateProducer(queue);
+                producer.Send(session.CreateTextMessage("Hello"));
+                producer.Send(session.CreateTextMessage("Hello2"));
+                producer.Send(session.CreateTextMessage("Hello3"));
+
+                // Consume the message...
+                IMessageConsumer consumer = session.CreateConsumer(queue);
+                IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+                Assert.IsNotNull(msg);
+                msg = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+                Assert.IsNotNull(msg);
+                msg = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+                Assert.IsNotNull(msg);
+                msg.Acknowledge();
+
+                // Reset the session.
+                session.Close();
+                session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+
+                // Attempt to Consume the message...
+                consumer = session.CreateConsumer(queue);
+                msg = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+                Assert.IsNull(msg);
+
+                session.Close();
+            }
+        }
+
+        [Test]
+        public void TestUnAckedMessageAreNotConsumedOnSessionClose()
+        {
+            using(IConnection connection = CreateConnection(TEST_CLIENT_ID + ":" + new Random().Next()))
+            {
+                connection.Start();
+                ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+                IQueue queue = session.GetQueue(Guid.NewGuid().ToString());
+                IMessageProducer producer = session.CreateProducer(queue);
+                producer.Send(session.CreateTextMessage("Hello"));
+
+                // Consume the message...
+                IMessageConsumer consumer = session.CreateConsumer(queue);
+                IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+                Assert.IsNotNull(msg);
+                // Don't ack the message.
+
+                // Reset the session.  This should cause the unacknowledged message to be
re-delivered.
+                session.Close();
+                session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+
+                // Attempt to Consume the message...
+                consumer = session.CreateConsumer(queue);
+                msg = consumer.Receive(TimeSpan.FromMilliseconds(2000));
+                Assert.IsNotNull(msg);
+                msg.Acknowledge();
+
+                session.Close();
+            }
+        }
+
+        [Test]
+        public void TestAsyncAckedMessageAreConsumed()
+        {
+            using(IConnection connection = CreateConnection(TEST_CLIENT_ID + ":" + new Random().Next()))
+            {
+                connection.Start();
+                ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+                IQueue queue = session.GetQueue(Guid.NewGuid().ToString());
+                IMessageProducer producer = session.CreateProducer(queue);
+                producer.Send(session.CreateTextMessage("Hello"));
+
+                // Consume the message...
+                IMessageConsumer consumer = session.CreateConsumer(queue);
+                consumer.Listener += new MessageListener(OnMessage);
+
+                Thread.Sleep(5000);
+
+                // Reset the session.
+                session.Close();
+
+                session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+
+                // Attempt to Consume the message...
+                consumer = session.CreateConsumer(queue);
+                IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+                Assert.IsNull(msg);
+
+                session.Close();
+            }
+        }
+
+        [Test]
+        public void TestAsyncUnAckedMessageAreNotConsumedOnSessionClose()
+        {
+            using(IConnection connection = CreateConnection(TEST_CLIENT_ID + ":" + new Random().Next()))
+            {
+                connection.Start();
+                // don't aknowledge message on onMessage() call
+                dontAck = true;
+                ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+                IQueue queue = session.GetQueue("Guid.NewGuid().ToString()");
+                IMessageProducer producer = session.CreateProducer(queue);
+                producer.Send(session.CreateTextMessage("Hello"));
+
+                // Consume the message...
+                IMessageConsumer consumer = session.CreateConsumer(queue);
+                consumer.Listener += new MessageListener(OnMessage);
+                // Don't ack the message.
+
+                // Reset the session. This should cause the Unacked message to be
+                // redelivered.
+                session.Close();
+
+                Thread.Sleep(5000);
+                session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+                // Attempt to Consume the message...
+                consumer = session.CreateConsumer(queue);
+                IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(2000));
+                Assert.IsNotNull(msg);
+                msg.Acknowledge();
+
+                session.Close();
+            }
+        }
+
+        [Test]
+        public void TestAddRemoveAsnycMessageListener()
+        {
+            using(IConnection connection = CreateConnection(TEST_CLIENT_ID + ":" + new Random().Next()))
+            {
+                connection.Start();
+
+                ISession session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
+                ITemporaryTopic topic = session.CreateTemporaryTopic();
+                IMessageConsumer consumer = session.CreateConsumer(topic);
+
+                consumer.Listener += OnMessage;
+                consumer.Listener -= OnMessage;
+                consumer.Listener += OnMessage;
+
+                consumer.Close();
+            }
+        }
+
+        public void OnMessage(IMessage message)
+        {
+            Assert.IsNotNull(message);
+
+            if(!dontAck)
+            {
+                try
+                {
+                    message.Acknowledge();
+                }
+                catch(Exception)
+                {
+                }
+            }
+        }
+
+#endif
+
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/test/csharp/ConsumerTest.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj?rev=896478&r1=896477&r2=896478&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/vs2008-stomp-test.csproj Wed Jan  6 15:29:50
2010
@@ -80,5 +80,7 @@
     <Compile Include="src\test\csharp\ConnectionTest.cs" />
     <Compile Include="src\test\csharp\MessageTest.cs" />
     <Compile Include="src\test\csharp\ProducerTest.cs" />
+    <Compile Include="src\test\csharp\TextMessageTest.cs" />
+    <Compile Include="src\test\csharp\ConsumerTest.cs" />
   </ItemGroup>
 </Project>
\ No newline at end of file



Mime
View raw message