activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r892315 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x: ./ nant-common.xml src/test/csharp/RollbackRedeliveryTest.cs vs2008-activemq-test.csproj vs2008-activemq.csproj vs2008-activemq.sln
Date Fri, 18 Dec 2009 17:01:53 GMT
Author: tabish
Date: Fri Dec 18 17:01:52 2009
New Revision: 892315

URL: http://svn.apache.org/viewvc?rev=892315&view=rev
Log:
http://issues.apache.org/activemq/browse/AMQNET-220

Updated RedeliveryRollbackTests to check for the case where max redeliveries is reached. 
Added test for single session and multiple session consumption.  Updated the project files
with the new test.

Added:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/test/csharp/RollbackRedeliveryTest.cs
      - copied, changed from r891917, activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/RollbackRedeliveryTest.cs
Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/   (props changed)
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/nant-common.xml   (props changed)
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/vs2008-activemq-test.csproj
  (contents, props changed)
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/vs2008-activemq.csproj   (props
changed)
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/vs2008-activemq.sln   (props
changed)

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Dec 18 17:01:52 2009
@@ -1,2 +1,3 @@
 /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/tags/1.0.0:692591,693525
 /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/tags/1.1.0:788230,788233,790183
+/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk:891917,892010-892308

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/nant-common.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Dec 18 17:01:52 2009
@@ -1 +1,2 @@
 /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/tags/1.1.0/nant-common.xml:788230,788233,790183
+/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/nant-common.xml:891917,892010-892308

Copied: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/test/csharp/RollbackRedeliveryTest.cs
(from r891917, activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/RollbackRedeliveryTest.cs)
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/test/csharp/RollbackRedeliveryTest.cs?p2=activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/test/csharp/RollbackRedeliveryTest.cs&p1=activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/RollbackRedeliveryTest.cs&r1=891917&r2=892315&rev=892315&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/RollbackRedeliveryTest.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/test/csharp/RollbackRedeliveryTest.cs
Fri Dec 18 17:01:52 2009
@@ -27,30 +27,30 @@
 using NUnit.Framework;
 
 namespace Apache.NMS.Test
-{		
-	[TestFixture]	
-	public class RollbackRedeliveryTest : NMSTestSupport
-	{
-		protected static string DESTINATION_NAME = "TestDestination";
-		protected static string TEST_CLIENT_ID = "RollbackRedeliveryTestId";		
-
-	    private const int nbMessages = 10;
-    	private const String destinationName = "RollbackRedeliveryTestDestination";
-		
+{
+    [TestFixture]
+    public class RollbackRedeliveryTest : NMSTestSupport
+    {
+        protected static string DESTINATION_NAME = "TestDestination";
+        protected static string TEST_CLIENT_ID = "RollbackRedeliveryTestId";
+
+        private const int nbMessages = 10;
+        private const String destinationName = "RollbackRedeliveryTestDestination";
+
         private IConnection connection;
-		
-		public RollbackRedeliveryTest()
-		{
-		}
-		
+
+        public RollbackRedeliveryTest()
+        {
+        }
+
         [SetUp]
         public override void SetUp()
         {
             base.SetUp();
-    
+
             connection = CreateConnection();
             connection.Start();
-            
+
             Session session = connection.CreateSession() as Session;
             IQueue queue = session.GetQueue(destinationName);
             session.DeleteDestination(queue);
@@ -62,317 +62,414 @@
             connection.Close();
             base.TearDown();
         }
-		
+
         [Test]
-	    public void testRedelivery() 
+        public void TestRedelivery()
         {
             // Use non-interleaved producer and default prefetch.
-	        DoTestRedelivery((connection as Connection).PrefetchPolicy.QueuePrefetch, false);
-	    }
-	
-//	    public void testRedeliveryWithInterleavedProducer() {
-//	        doTestRedelivery(brokerUrl, true);
-//	    }
-//	
-//	    public void testRedeliveryWithPrefetch0() {
-//	        doTestRedelivery(brokerUrl + "?jms.prefetchPolicy.queuePrefetch=0", true);
-//	    }
-//	    
-//	    public void testRedeliveryWithPrefetch1() {
-//	        doTestRedelivery(brokerUrl + "?jms.prefetchPolicy.queuePrefetch=1", true);
-//	    }
-	    
-	    public void DoTestRedelivery(int queuePrefetch, bool interleaveProducer)
-        {	        
+            DoTestRedelivery((connection as Connection).PrefetchPolicy.QueuePrefetch, false);
+        }
+
+//        [Test]
+//        public void TestRedeliveryWithInterleavedProducer()
+//        {
+//            DoTestRedelivery((connection as Connection).PrefetchPolicy.QueuePrefetch, true);
+//        }
+//
+//        [Test]
+//        public void TestRedeliveryWithPrefetch0()
+//        {
+//            DoTestRedelivery(0, true);
+//        }
+//
+//        [Test]
+//        public void TestRedeliveryWithPrefetch1()
+//        {
+//            DoTestRedelivery(1, true);
+//        }
+
+        public void DoTestRedelivery(int queuePrefetch, bool interleaveProducer)
+        {
             (connection as Connection).PrefetchPolicy.QueuePrefetch = queuePrefetch;
-            
-	        connection.Start();
-	
-	        if(interleaveProducer) 
+
+            connection.Start();
+
+            if(interleaveProducer)
+            {
+                PopulateDestinationWithInterleavedProducer(nbMessages, destinationName);
+            }
+            else
             {
-	            PopulateDestinationWithInterleavedProducer(nbMessages, destinationName);
-	        } 
-            else 
+                PopulateDestination(nbMessages, destinationName);
+            }
+
+            // Consume messages and Rollback transactions
             {
-	            PopulateDestination(nbMessages, destinationName);
+                int received = 0;
+                IDictionary rolledback = Hashtable.Synchronized(new Hashtable());
+
+                while(received < nbMessages)
+                {
+                    ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+                    IDestination destination = session.GetQueue(destinationName);
+                    IMessageConsumer consumer = session.CreateConsumer(destination);
+                    ITextMessage msg = (ITextMessage) consumer.Receive(TimeSpan.FromMilliseconds(6000000));
+
+                    if(msg != null)
+                    {
+                        if(msg != null && rolledback.Contains(msg.Text))
+                        {
+                            Interlocked.Increment(ref received);
+                            Assert.IsTrue(msg.NMSRedelivered);
+                            Assert.AreEqual(2, msg.Properties.GetLong("NMSXDeliveryCount"));
+                            session.Commit();
+                        }
+                        else
+                        {
+                            Assert.IsFalse(msg.NMSRedelivered);
+                            rolledback.Add(msg.Text, (Boolean) true);
+                            session.Rollback();
+                        }
+                    }
+                    consumer.Close();
+                    session.Close();
+                }
+            }
+        }
+
+        [Test]
+	    public void TestRedeliveryOnSingleConsumer()
+        {
+	        connection.Start();
+
+	        PopulateDestinationWithInterleavedProducer(nbMessages, destinationName);
+
+	        // Consume messages and Rollback transactions
+	        {
+	            int received = 0;
+                IDictionary rolledback = Hashtable.Synchronized(new Hashtable());
+	            ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+	            IDestination destination = session.GetQueue(destinationName);
+	            IMessageConsumer consumer = session.CreateConsumer(destination);
+
+                while(received < nbMessages)
+                {
+	                ITextMessage msg = (ITextMessage) consumer.Receive(TimeSpan.FromMilliseconds(6000000));
+	                if(msg != null)
+                    {
+                        if(msg != null && rolledback.Contains(msg.Text))
+                        {
+                            Interlocked.Increment(ref received);
+	                        Assert.IsTrue(msg.NMSRedelivered);
+	                        session.Commit();
+	                    }
+                        else
+                        {
+                            rolledback.Add(msg.Text, (Boolean) true);
+	                        session.Rollback();
+	                    }
+	                }
+	            }
+	            consumer.Close();
+	            session.Close();
 	        }
-	        
+	    }
+
+        [Test]
+        public void TestRedeliveryOnSingleSession()
+        {
+	        connection.Start();
+
+	        PopulateDestination(nbMessages, destinationName);
+
 	        // Consume messages and Rollback transactions
 	        {
                 int received = 0;
                 IDictionary rolledback = Hashtable.Synchronized(new Hashtable());
+	            ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+	            IDestination destination = session.GetQueue(destinationName);
 
-                while(received < nbMessages) 
+                while(received < nbMessages)
                 {
-	                ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
-	                IDestination destination = session.GetQueue(destinationName);
 	                IMessageConsumer consumer = session.CreateConsumer(destination);
 	                ITextMessage msg = (ITextMessage) consumer.Receive(TimeSpan.FromMilliseconds(6000000));
-	                
-                    if(msg != null) 
+	                if(msg != null)
                     {
-	                    if(msg != null && rolledback.Contains(msg.Text)) 
+                        if(msg != null && rolledback.Contains(msg.Text))
                         {
-                            Interlocked.Increment(ref received);                        
   
-	                        Console.WriteLine("Received message " + msg.Text + " (" + received
+ ")" + msg.NMSMessageId);
+                            Interlocked.Increment(ref received);
 	                        Assert.IsTrue(msg.NMSRedelivered);
-	                        Assert.AreEqual(2, msg.Properties.GetLong("NMSXDeliveryCount"));
 	                        session.Commit();
-	                    } 
-                        else 
+	                    }
+                        else
                         {
-	                        Console.WriteLine("Rollback message " + msg.Text + " id: " +  msg.NMSMessageId);
-	                        Assert.IsFalse(msg.NMSRedelivered);
                             rolledback.Add(msg.Text, (Boolean) true);
 	                        session.Rollback();
 	                    }
 	                }
 	                consumer.Close();
-	                session.Close();
 	            }
+	            session.Close();
 	        }
 	    }
-   
-//	    public void testRedeliveryOnSingleConsumer() {
-//	
-//	        ConnectionFactory connectionFactory = 
-//	            new ActiveMQConnectionFactory(brokerUrl);
-//	        Connection connection = connectionFactory.createConnection();
-//	        connection.Start();
-//	
-//	        populateDestinationWithInterleavedProducer(nbMessages, destinationName, connection);
-//	
-//	        // Consume messages and Rollback transactions
-//	        {
-//	            AtomicInteger received = new AtomicInteger();
-//	            Map<String, Boolean> rolledback = new ConcurrentHashMap<String, Boolean>();
-//	            ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
-//	            IDestination destination = session.GetQueue(destinationName);
-//	            IMessageConsumer consumer = session.CreateConsumer(destination);         
  
-//	            while (received.get() < nbMessages) {
-//	                ITextMessage msg = (ITextMessage) consumer.Receive(TimeSpan.FromMilliseconds(6000000));
-//	                if (msg != null) {
-//	                    if (msg != null && rolledback.put(msg.Text, Boolean.TRUE)
!= null) {
-//	                        LOG.info("Received message " + msg.Text + " (" + received.getAndIncrement()
+ ")" + msg.NMSMessageId);
-//	                        assertTrue(msg.NMSRedelivered());
-//	                        session.Commit();
-//	                    } else {
-//	                        LOG.info("Rollback message " + msg.Text + " id: " +  msg.NMSMessageId);
-//	                        session.Rollback();
-//	                    }
-//	                }
-//	            }
-//	            consumer.Close();
-//	            session.Close();
-//	        }
-//	    }
-	    
-//        [Test]        
-//	    public void TestRedeliveryOnSingleSession() 
-//        {
-//	        connection.Start();
-//	
-//	        PopulateDestination(nbMessages, destinationName);
-//	
-//	        // Consume messages and Rollback transactions
-//	        {
-//                int received = 0;
-//                IDictionary rolledback = Hashtable.Synchronized(new Hashtable());
-//	            ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
-//	            IDestination destination = session.GetQueue(destinationName);
-//	            
-//                while(received < nbMessages) 
-//                {
-//	                IMessageConsumer consumer = session.CreateConsumer(destination);     
      
-//	                ITextMessage msg = (ITextMessage) consumer.Receive(TimeSpan.FromMilliseconds(6000000));
-//	                if(msg != null) 
-//                    {
-//                        if(msg != null && rolledback.Contains(msg.Text)) 
-//                        {
-//                            Interlocked.Increment(ref received);
-//                            Console.WriteLine("Received Message " + msg.Text + "(" + received
+ ") " + msg.NMSMessageId);
-//	                        Assert.IsTrue(msg.NMSRedelivered);
-//	                        session.Commit();
-//	                    } 
-//                        else 
-//                        {
-//                            Console.WriteLine("Rollback message " + msg.Text + " id: "
+  msg.NMSMessageId);                            
-//                            rolledback.Add(msg.Text, (Boolean) true);
-//	                        session.Rollback();
-//	                    }
-//	                }
-//	                consumer.Close();
-//	            }
-//	            session.Close();
-//	        }
-//	    }
-	    
-		[Test]
-	    public void TestValidateRedeliveryCountOnRollback() 
-		{
-	        const int numMessages = 1;
-	        connection.Start();
-	
-	        PopulateDestination(numMessages, destinationName);
-	
-	        {
+
+        [Test]
+        public void TestMessageRedelivedMaxRedeliveriesTimesSingleSession()
+        {
+            connection.RedeliveryPolicy.MaximumRedeliveries = 15;
+            connection.RedeliveryPolicy.UseCollisionAvoidance = false;
+            connection.RedeliveryPolicy.UseExponentialBackOff = false;
+
+            connection.Start();
+
+            PopulateDestination(1, destinationName);
+
+            // Consume messages and Rollback transactions
+            {
+                int received = 0;
+                ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+                IDestination destination = session.GetQueue(destinationName);
+
+                IMessageConsumer consumer = session.CreateConsumer(destination);
+                ITextMessage msg = null;
+
+                while(received <= connection.RedeliveryPolicy.MaximumRedeliveries)
+                {
+                    msg = (ITextMessage) consumer.Receive(TimeSpan.FromMilliseconds(6000000));
+                    Assert.IsNotNull(msg);
+
+                    if(received > 0)
+                    {
+                        Assert.IsTrue(msg.NMSRedelivered);
+                        Assert.AreEqual(received, msg.Properties.GetLong("NMSXDeliveryCount")
- 1);
+                    }
+
+                    Interlocked.Increment(ref received);
+
+                    session.Rollback();
+                }
+
+                msg = (ITextMessage) consumer.Receive(TimeSpan.FromMilliseconds(6000));
+                Assert.IsNull(msg);
+
+                session.Close();
+            }
+        }
+
+        [Test]
+        public void TestMessageRedelivedMaxRedeliveriesTimesMultipleSessions()
+        {
+            connection.RedeliveryPolicy.MaximumRedeliveries = 15;
+            connection.RedeliveryPolicy.UseCollisionAvoidance = false;
+            connection.RedeliveryPolicy.UseExponentialBackOff = false;
+
+            connection.Start();
+
+            PopulateDestination(1, destinationName);
+
+            // Consume messages and Rollback transactions
+            {
+                int received = 0;
+
+                ISession session = null;
+                IDestination destination = null;
+                IMessageConsumer consumer = null;
+                ITextMessage msg = null;
+
+                while(received <= connection.RedeliveryPolicy.MaximumRedeliveries)
+                {
+                    session = connection.CreateSession(AcknowledgementMode.Transactional);
+                    destination = session.GetQueue(destinationName);
+                    consumer = session.CreateConsumer(destination);
+                    msg = (ITextMessage) consumer.Receive(TimeSpan.FromMilliseconds(6000000));
+                    Assert.IsNotNull(msg);
+
+                    if(received > 0)
+                    {
+                        Assert.IsTrue(msg.NMSRedelivered);
+                        Assert.AreEqual(received, msg.Properties.GetLong("NMSXDeliveryCount")
- 1);
+                    }
+
+                    Interlocked.Increment(ref received);
+
+                    session.Rollback();
+                    session.Close();
+                }
+
+                session = connection.CreateSession(AcknowledgementMode.Transactional);
+                destination = session.GetQueue(destinationName);
+                consumer = session.CreateConsumer(destination);
+                msg = (ITextMessage) consumer.Receive(TimeSpan.FromMilliseconds(6000));
+                Assert.IsNull(msg);
+            }
+        }
+
+        [Test]
+        public void TestValidateRedeliveryCountOnRollback()
+        {
+            const int numMessages = 1;
+            connection.Start();
+
+            PopulateDestination(numMessages, destinationName);
+
+            {
                 int received = 0;
                 int maxRetries = new RedeliveryPolicy().MaximumRedeliveries;
 
-                while(received < maxRetries) 
-    			{
+                while(received < maxRetries)
+                {
                     ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
-	                IDestination destination = session.GetQueue(destinationName);
-	
-                    IMessageConsumer consumer = session.CreateConsumer(destination);    
       
+                    IDestination destination = session.GetQueue(destinationName);
+
+                    IMessageConsumer consumer = session.CreateConsumer(destination);
                     ITextMessage msg = (ITextMessage) consumer.Receive(TimeSpan.FromMilliseconds(2000));
-                    
-                    if(msg != null) 
+
+                    if(msg != null)
                     {
                         Interlocked.Increment(ref received);
-                        Assert.AreEqual(received, msg.Properties.GetLong("NMSXDeliveryCount"),

+                        Assert.AreEqual(received, msg.Properties.GetLong("NMSXDeliveryCount"),
                                         "redelivery property matches deliveries");
                         session.Rollback();
                     }
                     session.Close();
                 }
-                
+
                 ConsumeMessage(maxRetries + 1);
             }
-	    }
+        }
 
-		[Test]
-	    public void TestValidateRedeliveryCountOnRollbackWithPrefetch0() 
-		{	
-			const int numMessages = 1;
-			(connection as Connection).PrefetchPolicy.SetAll(0);
-	        connection.Start();
-	
-	        PopulateDestination(numMessages, destinationName);
-	
-	        {
-	            int received = 0;
-	            int maxRetries = new RedeliveryPolicy().MaximumRedeliveries;
-				
-	            while(received < maxRetries) 
-				{
-	                ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
-	                IDestination destination = session.GetQueue(destinationName);
-	
-	                IMessageConsumer consumer = session.CreateConsumer(destination);       
    
-	                ITextMessage msg = (ITextMessage) consumer.Receive(TimeSpan.FromMilliseconds(2000));
-	                
-					if(msg != null) 
-					{
-						Interlocked.Increment(ref received);
-	                    Assert.AreEqual(received, msg.Properties.GetLong("NMSXDeliveryCount"),

-						                "redelivery property matches deliveries");
-	                    session.Rollback();
-	                }
-	                session.Close();
-	            }
-	            
-	            ConsumeMessage(maxRetries + 1);
-	        }
-	    }
-	
-	    private void ConsumeMessage(int deliveryCount) 
-		{
-	        ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
-	        IDestination destination = session.GetQueue(destinationName);
-	        IMessageConsumer consumer = session.CreateConsumer(destination);            
-	        ITextMessage msg = (ITextMessage) consumer.Receive(TimeSpan.FromMilliseconds(1000));
-	        Assert.IsNotNull(msg);
-	        Assert.AreEqual(deliveryCount, msg.Properties.GetLong("NMSXDeliveryCount"),
-			                "redelivery property matches deliveries");
-	        session.Commit();
-	        session.Close();
-	    }
-	
-		[Test]
-	    public void TestRedeliveryPropertyWithNoRollback() 
-		{
-	        const int numMessages = 1;
-	
-	        PopulateDestination(numMessages, destinationName);
-	        connection.Close();
-	        
-	        {
-	            int received = 0;
-	            
-				int maxRetries = new RedeliveryPolicy().MaximumRedeliveries;
-	            
-				while(received < maxRetries) 
-				{
-	                connection = CreateConnection();
-	                connection.Start();
-	                ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
-	                IDestination destination = session.GetQueue(destinationName);
-	
-	                IMessageConsumer consumer = session.CreateConsumer(destination);       
    
-	                ITextMessage msg = (ITextMessage) consumer.Receive(TimeSpan.FromMilliseconds(2000));
-	                
-					if(msg != null) 
-					{
-						Interlocked.Increment(ref received);
-	                    Assert.AreEqual(received, msg.Properties.GetLong("NMSXDeliveryCount"),

-						                "redelivery property matches deliveries");
-	                }
-					
-	                session.Close();
-	                connection.Close();
-	            }
-				
-	            connection = CreateConnection();
-	            connection.Start();
-	            ConsumeMessage(maxRetries + 1);
-	        }
-	    }
-	    
-	    private void PopulateDestination(int nbMessages, string destinationName) 
-		{
-	        ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
-	        IDestination destination = session.GetQueue(destinationName);
-	        IMessageProducer producer = session.CreateProducer(destination);
-	        
-			for(int i = 1; i <= nbMessages; i++) 
-			{
-	            producer.Send(session.CreateTextMessage("<hello id='" + i + "'/>"));
-	        }
-			
-	        producer.Close();
-	        session.Close();
-	    }
-	
-	    private void PopulateDestinationWithInterleavedProducer(int nbMessages, string destinationName)

-		{
-	        ISession session1 = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
-	        IDestination destination1 = session1.GetQueue(destinationName);
-	        IMessageProducer producer1 = session1.CreateProducer(destination1);
-			producer1.DeliveryMode = MsgDeliveryMode.NonPersistent;
-	        
-			ISession session2 = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
-	        IDestination destination2 = session2.GetQueue(destinationName);
-	        IMessageProducer producer2 = session2.CreateProducer(destination2);
+        [Test]
+        public void TestValidateRedeliveryCountOnRollbackWithPrefetch0()
+        {
+            const int numMessages = 1;
+            (connection as Connection).PrefetchPolicy.SetAll(0);
+            connection.Start();
+
+            PopulateDestination(numMessages, destinationName);
+
+            {
+                int received = 0;
+                int maxRetries = new RedeliveryPolicy().MaximumRedeliveries;
+
+                while(received < maxRetries)
+                {
+                    ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+                    IDestination destination = session.GetQueue(destinationName);
+
+                    IMessageConsumer consumer = session.CreateConsumer(destination);
+                    ITextMessage msg = (ITextMessage) consumer.Receive(TimeSpan.FromMilliseconds(2000));
+
+                    if(msg != null)
+                    {
+                        Interlocked.Increment(ref received);
+                        Assert.AreEqual(received, msg.Properties.GetLong("NMSXDeliveryCount"),
+                                        "redelivery property matches deliveries");
+                        session.Rollback();
+                    }
+                    session.Close();
+                }
+
+                ConsumeMessage(maxRetries + 1);
+            }
+        }
+
+        private void ConsumeMessage(int deliveryCount)
+        {
+            ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+            IDestination destination = session.GetQueue(destinationName);
+            IMessageConsumer consumer = session.CreateConsumer(destination);
+            ITextMessage msg = (ITextMessage) consumer.Receive(TimeSpan.FromMilliseconds(1000));
+            Assert.IsNotNull(msg);
+            Assert.AreEqual(deliveryCount, msg.Properties.GetLong("NMSXDeliveryCount"),
+                            "redelivery property matches deliveries");
+            session.Commit();
+            session.Close();
+        }
+
+        [Test]
+        public void TestRedeliveryPropertyWithNoRollback()
+        {
+            const int numMessages = 1;
+
+            PopulateDestination(numMessages, destinationName);
+            connection.Close();
+
+            {
+                int received = 0;
+
+                int maxRetries = new RedeliveryPolicy().MaximumRedeliveries;
+
+                while(received < maxRetries)
+                {
+                    connection = CreateConnection();
+                    connection.Start();
+                    ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+                    IDestination destination = session.GetQueue(destinationName);
+
+                    IMessageConsumer consumer = session.CreateConsumer(destination);
+                    ITextMessage msg = (ITextMessage) consumer.Receive(TimeSpan.FromMilliseconds(2000));
+
+                    if(msg != null)
+                    {
+                        Interlocked.Increment(ref received);
+                        Assert.AreEqual(received, msg.Properties.GetLong("NMSXDeliveryCount"),
+                                        "redelivery property matches deliveries");
+                    }
+
+                    session.Close();
+                    connection.Close();
+                }
+
+                connection = CreateConnection();
+                connection.Start();
+                ConsumeMessage(maxRetries + 1);
+            }
+        }
+
+        private void PopulateDestination(int nbMessages, string destinationName)
+        {
+            ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+            IDestination destination = session.GetQueue(destinationName);
+            IMessageProducer producer = session.CreateProducer(destination);
+
+            for(int i = 1; i <= nbMessages; i++)
+            {
+                producer.Send(session.CreateTextMessage("<hello id='" + i + "'/>"));
+            }
+
+            producer.Close();
+            session.Close();
+        }
+
+        private void PopulateDestinationWithInterleavedProducer(int nbMessages, string destinationName)
+        {
+            ISession session1 = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+            IDestination destination1 = session1.GetQueue(destinationName);
+            IMessageProducer producer1 = session1.CreateProducer(destination1);
+            producer1.DeliveryMode = MsgDeliveryMode.NonPersistent;
+
+            ISession session2 = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+            IDestination destination2 = session2.GetQueue(destinationName);
+            IMessageProducer producer2 = session2.CreateProducer(destination2);
             producer2.DeliveryMode = MsgDeliveryMode.NonPersistent;
 
-	        for(int i = 1; i <= nbMessages; i++) 
-			{
-	            if(i%2 == 0)
-				{
-	                producer1.Send(session1.CreateTextMessage("<hello id='" + i + "'/>"));
-	            }
-				else 
-				{
-	                producer2.Send(session2.CreateTextMessage("<hello id='" + i + "'/>"));
-	            }
-	        }
-			
-	        producer1.Close();
-	        session1.Close();
-	        producer2.Close();
-	        session2.Close();
-	    }
-		
-	}
+            for(int i = 1; i <= nbMessages; i++)
+            {
+                if(i%2 == 0)
+                {
+                    producer1.Send(session1.CreateTextMessage("<hello id='" + i + "'/>"));
+                }
+                else
+                {
+                    producer2.Send(session2.CreateTextMessage("<hello id='" + i + "'/>"));
+                }
+            }
+
+            producer1.Close();
+            session1.Close();
+            producer2.Close();
+            session2.Close();
+        }
+
+    }
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/vs2008-activemq-test.csproj
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/vs2008-activemq-test.csproj?rev=892315&r1=892314&r2=892315&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/vs2008-activemq-test.csproj
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/vs2008-activemq-test.csproj
Fri Dec 18 17:01:52 2009
@@ -86,6 +86,7 @@
     </Compile>
     <Compile Include="src\test\csharp\OpenWire\MaxInactivityDurationTest.cs" />
     <Compile Include="src\test\csharp\OpenWire\PrefetchSizeZeroTest.cs" />
+    <Compile Include="src\test\csharp\RollbackRedeliveryTest.cs" />
     <Compile Include="src\test\csharp\StompHelperTest.cs" />
     <Compile Include="src\test\csharp\Threads\DedicatedTaskRunnerTest.cs" />
     <Compile Include="src\test\csharp\Transport\failover\FailoverTransportTest.cs" />

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/vs2008-activemq-test.csproj
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Dec 18 17:01:52 2009
@@ -1 +1,2 @@
 /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/tags/1.1.0/vs2008-activemq-test.csproj:788230,788233,790183
+/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq-test.csproj:891917,892010-892308

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/vs2008-activemq.csproj
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Dec 18 17:01:52 2009
@@ -1 +1,2 @@
 /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/tags/1.1.0/vs2008-activemq.csproj:788230,788233,790183
+/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq.csproj:891917,892010-892308

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/vs2008-activemq.sln
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Dec 18 17:01:52 2009
@@ -1 +1,2 @@
 /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/tags/1.1.0/vs2008-activemq.sln:788230,788233,790183
+/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq.sln:891917,892010-892308



Mime
View raw message