activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r892010 - /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/RollbackRedeliveryTest.cs
Date Thu, 17 Dec 2009 23:50:35 GMT
Author: tabish
Date: Thu Dec 17 23:50:35 2009
New Revision: 892010

URL: http://svn.apache.org/viewvc?rev=892010&view=rev
Log:
Updated RedeliveryRollbackTest

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/RollbackRedeliveryTest.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/RollbackRedeliveryTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/RollbackRedeliveryTest.cs?rev=892010&r1=892009&r2=892010&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/RollbackRedeliveryTest.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/RollbackRedeliveryTest.cs
Thu Dec 17 23:50:35 2009
@@ -27,30 +27,30 @@
 using NUnit.Framework;
 
 namespace Apache.NMS.Test
-{		
-	[TestFixture]	
-	public class RollbackRedeliveryTest : NMSTestSupport
-	{
-		protected static string DESTINATION_NAME = "TestDestination";
-		protected static string TEST_CLIENT_ID = "RollbackRedeliveryTestId";		
-
-	    private const int nbMessages = 10;
-    	private const String destinationName = "RollbackRedeliveryTestDestination";
-		
+{
+    [TestFixture]
+    public class RollbackRedeliveryTest : NMSTestSupport
+    {
+        protected static string DESTINATION_NAME = "TestDestination";
+        protected static string TEST_CLIENT_ID = "RollbackRedeliveryTestId";
+
+        private const int nbMessages = 10;
+        private const String destinationName = "RollbackRedeliveryTestDestination";
+
         private IConnection connection;
-		
-		public RollbackRedeliveryTest()
-		{
-		}
-		
+
+        public RollbackRedeliveryTest()
+        {
+        }
+
         [SetUp]
         public override void SetUp()
         {
             base.SetUp();
-    
+
             connection = CreateConnection();
             connection.Start();
-            
+
             Session session = connection.CreateSession() as Session;
             IQueue queue = session.GetQueue(destinationName);
             session.DeleteDestination(queue);
@@ -62,317 +62,323 @@
             connection.Close();
             base.TearDown();
         }
-		
+
         [Test]
-	    public void testRedelivery() 
+        public void TestRedelivery()
         {
             // Use non-interleaved producer and default prefetch.
-	        DoTestRedelivery((connection as Connection).PrefetchPolicy.QueuePrefetch, false);
-	    }
-	
-//	    public void testRedeliveryWithInterleavedProducer() {
-//	        doTestRedelivery(brokerUrl, true);
-//	    }
-//	
-//	    public void testRedeliveryWithPrefetch0() {
-//	        doTestRedelivery(brokerUrl + "?jms.prefetchPolicy.queuePrefetch=0", true);
-//	    }
-//	    
-//	    public void testRedeliveryWithPrefetch1() {
-//	        doTestRedelivery(brokerUrl + "?jms.prefetchPolicy.queuePrefetch=1", true);
-//	    }
-	    
-	    public void DoTestRedelivery(int queuePrefetch, bool interleaveProducer)
-        {	        
+            DoTestRedelivery((connection as Connection).PrefetchPolicy.QueuePrefetch, false);
+        }
+
+//        [Test]
+//        public void TestRedeliveryWithInterleavedProducer()
+//        {
+//            DoTestRedelivery((connection as Connection).PrefetchPolicy.QueuePrefetch, true);
+//        }
+//
+//        [Test]
+//        public void TestRedeliveryWithPrefetch0()
+//        {
+//            DoTestRedelivery(0, true);
+//        }
+//
+//        [Test]
+//        public void TestRedeliveryWithPrefetch1()
+//        {
+//            DoTestRedelivery(1, true);
+//        }
+
+        public void DoTestRedelivery(int queuePrefetch, bool interleaveProducer)
+        {
             (connection as Connection).PrefetchPolicy.QueuePrefetch = queuePrefetch;
-            
-	        connection.Start();
-	
-	        if(interleaveProducer) 
+
+            connection.Start();
+
+            if(interleaveProducer)
+            {
+                PopulateDestinationWithInterleavedProducer(nbMessages, destinationName);
+            }
+            else
             {
-	            PopulateDestinationWithInterleavedProducer(nbMessages, destinationName);
-	        } 
-            else 
+                PopulateDestination(nbMessages, destinationName);
+            }
+
+            // Consume messages and Rollback transactions
             {
-	            PopulateDestination(nbMessages, destinationName);
+                int received = 0;
+                IDictionary rolledback = Hashtable.Synchronized(new Hashtable());
+
+                while(received < nbMessages)
+                {
+                    ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+                    IDestination destination = session.GetQueue(destinationName);
+                    IMessageConsumer consumer = session.CreateConsumer(destination);
+                    ITextMessage msg = (ITextMessage) consumer.Receive(TimeSpan.FromMilliseconds(6000000));
+
+                    if(msg != null)
+                    {
+                        if(msg != null && rolledback.Contains(msg.Text))
+                        {
+                            Interlocked.Increment(ref received);
+                            Assert.IsTrue(msg.NMSRedelivered);
+                            Assert.AreEqual(2, msg.Properties.GetLong("NMSXDeliveryCount"));
+                            session.Commit();
+                        }
+                        else
+                        {
+                            Assert.IsFalse(msg.NMSRedelivered);
+                            rolledback.Add(msg.Text, (Boolean) true);
+                            session.Rollback();
+                        }
+                    }
+                    consumer.Close();
+                    session.Close();
+                }
+            }
+        }
+
+        [Test]
+	    public void TestRedeliveryOnSingleConsumer()
+        {
+	        connection.Start();
+
+	        PopulateDestinationWithInterleavedProducer(nbMessages, destinationName);
+
+	        // Consume messages and Rollback transactions
+	        {
+	            int received = 0;
+                IDictionary rolledback = Hashtable.Synchronized(new Hashtable());
+	            ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+	            IDestination destination = session.GetQueue(destinationName);
+	            IMessageConsumer consumer = session.CreateConsumer(destination);
+
+                while(received < nbMessages)
+                {
+	                ITextMessage msg = (ITextMessage) consumer.Receive(TimeSpan.FromMilliseconds(6000000));
+	                if(msg != null)
+                    {
+                        if(msg != null && rolledback.Contains(msg.Text))
+                        {
+                            Interlocked.Increment(ref received);
+	                        Assert.IsTrue(msg.NMSRedelivered);
+	                        session.Commit();
+	                    }
+                        else
+                        {
+                            rolledback.Add(msg.Text, (Boolean) true);
+	                        session.Rollback();
+	                    }
+	                }
+	            }
+	            consumer.Close();
+	            session.Close();
 	        }
-	        
+	    }
+
+        [Test]
+        public void TestRedeliveryOnSingleSession()
+        {
+	        connection.Start();
+
+	        PopulateDestination(nbMessages, destinationName);
+
 	        // Consume messages and Rollback transactions
 	        {
                 int received = 0;
                 IDictionary rolledback = Hashtable.Synchronized(new Hashtable());
+	            ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+	            IDestination destination = session.GetQueue(destinationName);
 
-                while(received < nbMessages) 
+                while(received < nbMessages)
                 {
-	                ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
-	                IDestination destination = session.GetQueue(destinationName);
 	                IMessageConsumer consumer = session.CreateConsumer(destination);
 	                ITextMessage msg = (ITextMessage) consumer.Receive(TimeSpan.FromMilliseconds(6000000));
-	                
-                    if(msg != null) 
+	                if(msg != null)
                     {
-	                    if(msg != null && rolledback.Contains(msg.Text)) 
+                        if(msg != null && rolledback.Contains(msg.Text))
                         {
-                            Interlocked.Increment(ref received);                        
   
-	                        Console.WriteLine("Received message " + msg.Text + " (" + received
+ ")" + msg.NMSMessageId);
+                            Interlocked.Increment(ref received);
 	                        Assert.IsTrue(msg.NMSRedelivered);
-	                        Assert.AreEqual(2, msg.Properties.GetLong("NMSXDeliveryCount"));
 	                        session.Commit();
-	                    } 
-                        else 
+	                    }
+                        else
                         {
-	                        Console.WriteLine("Rollback message " + msg.Text + " id: " +  msg.NMSMessageId);
-	                        Assert.IsFalse(msg.NMSRedelivered);
                             rolledback.Add(msg.Text, (Boolean) true);
 	                        session.Rollback();
 	                    }
 	                }
 	                consumer.Close();
-	                session.Close();
 	            }
+	            session.Close();
 	        }
 	    }
-   
-//	    public void testRedeliveryOnSingleConsumer() {
-//	
-//	        ConnectionFactory connectionFactory = 
-//	            new ActiveMQConnectionFactory(brokerUrl);
-//	        Connection connection = connectionFactory.createConnection();
-//	        connection.Start();
-//	
-//	        populateDestinationWithInterleavedProducer(nbMessages, destinationName, connection);
-//	
-//	        // Consume messages and Rollback transactions
-//	        {
-//	            AtomicInteger received = new AtomicInteger();
-//	            Map<String, Boolean> rolledback = new ConcurrentHashMap<String, Boolean>();
-//	            ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
-//	            IDestination destination = session.GetQueue(destinationName);
-//	            IMessageConsumer consumer = session.CreateConsumer(destination);         
  
-//	            while (received.get() < nbMessages) {
-//	                ITextMessage msg = (ITextMessage) consumer.Receive(TimeSpan.FromMilliseconds(6000000));
-//	                if (msg != null) {
-//	                    if (msg != null && rolledback.put(msg.Text, Boolean.TRUE)
!= null) {
-//	                        LOG.info("Received message " + msg.Text + " (" + received.getAndIncrement()
+ ")" + msg.NMSMessageId);
-//	                        assertTrue(msg.NMSRedelivered());
-//	                        session.Commit();
-//	                    } else {
-//	                        LOG.info("Rollback message " + msg.Text + " id: " +  msg.NMSMessageId);
-//	                        session.Rollback();
-//	                    }
-//	                }
-//	            }
-//	            consumer.Close();
-//	            session.Close();
-//	        }
-//	    }
-	    
-//        [Test]        
-//	    public void TestRedeliveryOnSingleSession() 
-//        {
-//	        connection.Start();
-//	
-//	        PopulateDestination(nbMessages, destinationName);
-//	
-//	        // Consume messages and Rollback transactions
-//	        {
-//                int received = 0;
-//                IDictionary rolledback = Hashtable.Synchronized(new Hashtable());
-//	            ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
-//	            IDestination destination = session.GetQueue(destinationName);
-//	            
-//                while(received < nbMessages) 
-//                {
-//	                IMessageConsumer consumer = session.CreateConsumer(destination);     
      
-//	                ITextMessage msg = (ITextMessage) consumer.Receive(TimeSpan.FromMilliseconds(6000000));
-//	                if(msg != null) 
-//                    {
-//                        if(msg != null && rolledback.Contains(msg.Text)) 
-//                        {
-//                            Interlocked.Increment(ref received);
-//                            Console.WriteLine("Received Message " + msg.Text + "(" + received
+ ") " + msg.NMSMessageId);
-//	                        Assert.IsTrue(msg.NMSRedelivered);
-//	                        session.Commit();
-//	                    } 
-//                        else 
-//                        {
-//                            Console.WriteLine("Rollback message " + msg.Text + " id: "
+  msg.NMSMessageId);                            
-//                            rolledback.Add(msg.Text, (Boolean) true);
-//	                        session.Rollback();
-//	                    }
-//	                }
-//	                consumer.Close();
-//	            }
-//	            session.Close();
-//	        }
-//	    }
-	    
-		[Test]
-	    public void TestValidateRedeliveryCountOnRollback() 
-		{
-	        const int numMessages = 1;
-	        connection.Start();
-	
-	        PopulateDestination(numMessages, destinationName);
-	
-	        {
+
+        [Test]
+        public void TestValidateRedeliveryCountOnRollback()
+        {
+            const int numMessages = 1;
+            connection.Start();
+
+            PopulateDestination(numMessages, destinationName);
+
+            {
                 int received = 0;
                 int maxRetries = new RedeliveryPolicy().MaximumRedeliveries;
 
-                while(received < maxRetries) 
-    			{
+                while(received < maxRetries)
+                {
                     ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
-	                IDestination destination = session.GetQueue(destinationName);
-	
-                    IMessageConsumer consumer = session.CreateConsumer(destination);    
       
+                    IDestination destination = session.GetQueue(destinationName);
+
+                    IMessageConsumer consumer = session.CreateConsumer(destination);
                     ITextMessage msg = (ITextMessage) consumer.Receive(TimeSpan.FromMilliseconds(2000));
-                    
-                    if(msg != null) 
+
+                    if(msg != null)
                     {
                         Interlocked.Increment(ref received);
-                        Assert.AreEqual(received, msg.Properties.GetLong("NMSXDeliveryCount"),

+                        Assert.AreEqual(received, msg.Properties.GetLong("NMSXDeliveryCount"),
                                         "redelivery property matches deliveries");
                         session.Rollback();
                     }
                     session.Close();
                 }
-                
+
                 ConsumeMessage(maxRetries + 1);
             }
-	    }
+        }
 
-		[Test]
-	    public void TestValidateRedeliveryCountOnRollbackWithPrefetch0() 
-		{	
-			const int numMessages = 1;
-			(connection as Connection).PrefetchPolicy.SetAll(0);
-	        connection.Start();
-	
-	        PopulateDestination(numMessages, destinationName);
-	
-	        {
-	            int received = 0;
-	            int maxRetries = new RedeliveryPolicy().MaximumRedeliveries;
-				
-	            while(received < maxRetries) 
-				{
-	                ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
-	                IDestination destination = session.GetQueue(destinationName);
-	
-	                IMessageConsumer consumer = session.CreateConsumer(destination);       
    
-	                ITextMessage msg = (ITextMessage) consumer.Receive(TimeSpan.FromMilliseconds(2000));
-	                
-					if(msg != null) 
-					{
-						Interlocked.Increment(ref received);
-	                    Assert.AreEqual(received, msg.Properties.GetLong("NMSXDeliveryCount"),

-						                "redelivery property matches deliveries");
-	                    session.Rollback();
-	                }
-	                session.Close();
-	            }
-	            
-	            ConsumeMessage(maxRetries + 1);
-	        }
-	    }
-	
-	    private void ConsumeMessage(int deliveryCount) 
-		{
-	        ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
-	        IDestination destination = session.GetQueue(destinationName);
-	        IMessageConsumer consumer = session.CreateConsumer(destination);            
-	        ITextMessage msg = (ITextMessage) consumer.Receive(TimeSpan.FromMilliseconds(1000));
-	        Assert.IsNotNull(msg);
-	        Assert.AreEqual(deliveryCount, msg.Properties.GetLong("NMSXDeliveryCount"),
-			                "redelivery property matches deliveries");
-	        session.Commit();
-	        session.Close();
-	    }
-	
-		[Test]
-	    public void TestRedeliveryPropertyWithNoRollback() 
-		{
-	        const int numMessages = 1;
-	
-	        PopulateDestination(numMessages, destinationName);
-	        connection.Close();
-	        
-	        {
-	            int received = 0;
-	            
-				int maxRetries = new RedeliveryPolicy().MaximumRedeliveries;
-	            
-				while(received < maxRetries) 
-				{
-	                connection = CreateConnection();
-	                connection.Start();
-	                ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
-	                IDestination destination = session.GetQueue(destinationName);
-	
-	                IMessageConsumer consumer = session.CreateConsumer(destination);       
    
-	                ITextMessage msg = (ITextMessage) consumer.Receive(TimeSpan.FromMilliseconds(2000));
-	                
-					if(msg != null) 
-					{
-						Interlocked.Increment(ref received);
-	                    Assert.AreEqual(received, msg.Properties.GetLong("NMSXDeliveryCount"),

-						                "redelivery property matches deliveries");
-	                }
-					
-	                session.Close();
-	                connection.Close();
-	            }
-				
-	            connection = CreateConnection();
-	            connection.Start();
-	            ConsumeMessage(maxRetries + 1);
-	        }
-	    }
-	    
-	    private void PopulateDestination(int nbMessages, string destinationName) 
-		{
-	        ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
-	        IDestination destination = session.GetQueue(destinationName);
-	        IMessageProducer producer = session.CreateProducer(destination);
-	        
-			for(int i = 1; i <= nbMessages; i++) 
-			{
-	            producer.Send(session.CreateTextMessage("<hello id='" + i + "'/>"));
-	        }
-			
-	        producer.Close();
-	        session.Close();
-	    }
-	
-	    private void PopulateDestinationWithInterleavedProducer(int nbMessages, string destinationName)

-		{
-	        ISession session1 = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
-	        IDestination destination1 = session1.GetQueue(destinationName);
-	        IMessageProducer producer1 = session1.CreateProducer(destination1);
-			producer1.DeliveryMode = MsgDeliveryMode.NonPersistent;
-	        
-			ISession session2 = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
-	        IDestination destination2 = session2.GetQueue(destinationName);
-	        IMessageProducer producer2 = session2.CreateProducer(destination2);
+        [Test]
+        public void TestValidateRedeliveryCountOnRollbackWithPrefetch0()
+        {
+            const int numMessages = 1;
+            (connection as Connection).PrefetchPolicy.SetAll(0);
+            connection.Start();
+
+            PopulateDestination(numMessages, destinationName);
+
+            {
+                int received = 0;
+                int maxRetries = new RedeliveryPolicy().MaximumRedeliveries;
+
+                while(received < maxRetries)
+                {
+                    ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+                    IDestination destination = session.GetQueue(destinationName);
+
+                    IMessageConsumer consumer = session.CreateConsumer(destination);
+                    ITextMessage msg = (ITextMessage) consumer.Receive(TimeSpan.FromMilliseconds(2000));
+
+                    if(msg != null)
+                    {
+                        Interlocked.Increment(ref received);
+                        Assert.AreEqual(received, msg.Properties.GetLong("NMSXDeliveryCount"),
+                                        "redelivery property matches deliveries");
+                        session.Rollback();
+                    }
+                    session.Close();
+                }
+
+                ConsumeMessage(maxRetries + 1);
+            }
+        }
+
+        private void ConsumeMessage(int deliveryCount)
+        {
+            ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+            IDestination destination = session.GetQueue(destinationName);
+            IMessageConsumer consumer = session.CreateConsumer(destination);
+            ITextMessage msg = (ITextMessage) consumer.Receive(TimeSpan.FromMilliseconds(1000));
+            Assert.IsNotNull(msg);
+            Assert.AreEqual(deliveryCount, msg.Properties.GetLong("NMSXDeliveryCount"),
+                            "redelivery property matches deliveries");
+            session.Commit();
+            session.Close();
+        }
+
+        [Test]
+        public void TestRedeliveryPropertyWithNoRollback()
+        {
+            const int numMessages = 1;
+
+            PopulateDestination(numMessages, destinationName);
+            connection.Close();
+
+            {
+                int received = 0;
+
+                int maxRetries = new RedeliveryPolicy().MaximumRedeliveries;
+
+                while(received < maxRetries)
+                {
+                    connection = CreateConnection();
+                    connection.Start();
+                    ISession session = connection.CreateSession(AcknowledgementMode.Transactional);
+                    IDestination destination = session.GetQueue(destinationName);
+
+                    IMessageConsumer consumer = session.CreateConsumer(destination);
+                    ITextMessage msg = (ITextMessage) consumer.Receive(TimeSpan.FromMilliseconds(2000));
+
+                    if(msg != null)
+                    {
+                        Interlocked.Increment(ref received);
+                        Assert.AreEqual(received, msg.Properties.GetLong("NMSXDeliveryCount"),
+                                        "redelivery property matches deliveries");
+                    }
+
+                    session.Close();
+                    connection.Close();
+                }
+
+                connection = CreateConnection();
+                connection.Start();
+                ConsumeMessage(maxRetries + 1);
+            }
+        }
+
+        private void PopulateDestination(int nbMessages, string destinationName)
+        {
+            ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+            IDestination destination = session.GetQueue(destinationName);
+            IMessageProducer producer = session.CreateProducer(destination);
+
+            for(int i = 1; i <= nbMessages; i++)
+            {
+                producer.Send(session.CreateTextMessage("<hello id='" + i + "'/>"));
+            }
+
+            producer.Close();
+            session.Close();
+        }
+
+        private void PopulateDestinationWithInterleavedProducer(int nbMessages, string destinationName)
+        {
+            ISession session1 = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+            IDestination destination1 = session1.GetQueue(destinationName);
+            IMessageProducer producer1 = session1.CreateProducer(destination1);
+            producer1.DeliveryMode = MsgDeliveryMode.NonPersistent;
+
+            ISession session2 = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+            IDestination destination2 = session2.GetQueue(destinationName);
+            IMessageProducer producer2 = session2.CreateProducer(destination2);
             producer2.DeliveryMode = MsgDeliveryMode.NonPersistent;
 
-	        for(int i = 1; i <= nbMessages; i++) 
-			{
-	            if(i%2 == 0)
-				{
-	                producer1.Send(session1.CreateTextMessage("<hello id='" + i + "'/>"));
-	            }
-				else 
-				{
-	                producer2.Send(session2.CreateTextMessage("<hello id='" + i + "'/>"));
-	            }
-	        }
-			
-	        producer1.Close();
-	        session1.Close();
-	        producer2.Close();
-	        session2.Close();
-	    }
-		
-	}
+            for(int i = 1; i <= nbMessages; i++)
+            {
+                if(i%2 == 0)
+                {
+                    producer1.Send(session1.CreateTextMessage("<hello id='" + i + "'/>"));
+                }
+                else
+                {
+                    producer2.Send(session2.CreateTextMessage("<hello id='" + i + "'/>"));
+                }
+            }
+
+            producer1.Close();
+            session1.Close();
+            producer2.Close();
+            session2.Close();
+        }
+
+    }
 }



Mime
View raw message