activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1204733 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src: main/csharp/Connection.cs test/csharp/Transport/failover/FailoverTransactionTest.cs
Date Mon, 21 Nov 2011 22:35:05 GMT
Author: tabish
Date: Mon Nov 21 22:35:04 2011
New Revision: 1204733

URL: http://svn.apache.org/viewvc?rev=1204733&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQNET-352

includes unit test. 

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/failover/FailoverTransactionTest.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=1204733&r1=1204732&r2=1204733&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs Mon Nov
21 22:35:04 2011
@@ -1046,8 +1046,12 @@ namespace Apache.NMS.ActiveMQ
 		{
 			Tracer.Debug("Connection: Transport has been Interrupted.");
 
-			this.transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.Count);
-			if(Tracer.IsDebugEnabled)
+            // Ensure that if there's an advisory consumer we don't add it to the
+            // set of consumers that need interruption processing.
+			this.transportInterruptionProcessingComplete =
+                new CountDownLatch(dispatchers.Count - (this.advisoryConsumer != null ? 1
: 0));
+
+            if(Tracer.IsDebugEnabled)
 			{
 				Tracer.Debug("transport interrupted, dispatchers: " + dispatchers.Count);
 			}

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/failover/FailoverTransactionTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/failover/FailoverTransactionTest.cs?rev=1204733&r1=1204732&r2=1204733&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/failover/FailoverTransactionTest.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/failover/FailoverTransactionTest.cs
Mon Nov 21 22:35:04 2011
@@ -29,53 +29,137 @@ using NUnit.Framework;
 namespace Apache.NMS.ActiveMQ.Test
 {
     [TestFixture]
-    public class FailoverTransactionTest
+    public class FailoverTransactionTest : NMSTestSupport
     {
         private Connection connection;
         private bool interrupted = false;
         private bool resumed = false;
+        private bool commitFailed = false;
 
-//        [Test]
-//        public void FailoverBeforeCommitSentTest()
-//        {
-//            string uri = "failover:(tcp://${activemqhost}:61616)";
-//            IConnectionFactory factory = new ConnectionFactory(NMSTestSupport.ReplaceEnvVar(uri));
-//            using(connection = factory.CreateConnection() as Connection)
-//            {
-//                connection.ConnectionInterruptedListener +=
-//                    new ConnectionInterruptedListener(TransportInterrupted);
-//                connection.ConnectionResumedListener +=
-//                    new ConnectionResumedListener(TransportResumed);
-//
-//                connection.Start();
-//                using(ISession session = connection.CreateSession())
-//                {
-//                    IDestination destination = session.GetQueue("Test?consumer.prefetchSize=1");
-//                    PurgeQueue(connection, destination);
-//                    PutMsgIntoQueue(session, destination);
-//
-//                    using(IMessageConsumer consumer = session.CreateConsumer(destination))
-//                    {
-//                        consumer.Listener += OnMessage;
-//                        BreakConnection();
-//                        WaitForMessagesToArrive();
-//                    }
-//                }
-//            }
-//
-//            Assert.IsTrue(this.interrupted);
-//            Assert.IsTrue(this.resumed);
-//        }
-//
-//        public void TransportInterrupted()
-//        {
-//            this.interrupted = true;
-//        }
-//
-//        public void TransportResumed()
-//        {
-//            this.resumed = true;
-//        }
+        private readonly int MSG_COUNT = 2;
+        private readonly String destinationName = "FailoverTransactionTestQ";
+
+        [Test]
+        public void FailoverBeforeCommitSentTest()
+        {
+            Tracer.Trace = new NmsConsoleTracer();
+
+            string uri = "failover:(tcpfaulty://${activemqhost}:61616?transport.useLogging=true)";
+            IConnectionFactory factory = new ConnectionFactory(NMSTestSupport.ReplaceEnvVar(uri));
+            using(connection = factory.CreateConnection() as Connection)
+            {
+                connection.ConnectionInterruptedListener +=
+                    new ConnectionInterruptedListener(TransportInterrupted);
+                connection.ConnectionResumedListener +=
+                    new ConnectionResumedListener(TransportResumed);
+
+                connection.Start();
+
+                ITransport transport = (connection as Connection).ITransport;
+                TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport))
as TcpFaultyTransport;
+                Assert.IsNotNull(tcpFaulty);
+                tcpFaulty.OnewayCommandPreProcessor += this.FailOnCommitTransportHook;
+
+                using(ISession session = connection.CreateSession())
+                {
+                    IDestination destination = session.GetQueue(destinationName);
+                    PurgeQueue(connection, destination);
+                }
+
+                Tracer.Debug("Test is putting " + MSG_COUNT + " messages on the queue: "
+ destinationName);
+
+                using(ISession session = connection.CreateSession(AcknowledgementMode.Transactional))
+                {
+                    IDestination destination = session.GetQueue(destinationName);
+                    PutMsgIntoQueue(session, destination);
+                }
+
+                Assert.IsTrue(this.interrupted);
+                Assert.IsTrue(this.resumed);
+
+                Tracer.Debug("Test is attempting to read " + MSG_COUNT +
+                             " messages from the queue: " + destinationName);
+
+                using(ISession session = connection.CreateSession())
+                {
+                    IDestination destination = session.GetQueue(destinationName);
+                    IMessageConsumer consumer = session.CreateConsumer(destination);
+                    for (int i = 0; i < MSG_COUNT; ++i)
+                    {
+                        IMessage msg = consumer.Receive(TimeSpan.FromSeconds(5));
+                        Assert.IsNotNull(msg, "Should receive message[" + (i + 1) + "] after
commit failed once.");
+                    }
+                }
+            }
+
+            Assert.IsTrue(this.interrupted);
+            Assert.IsTrue(this.resumed);
+        }
+
+        public void TransportInterrupted()
+        {
+            this.interrupted = true;
+        }
+
+        public void TransportResumed()
+        {
+            this.resumed = true;
+        }
+
+        private void PutMsgIntoQueue(ISession session, IDestination destination)
+        {
+            using(IMessageProducer producer = session.CreateProducer(destination))
+            {
+                ITextMessage message = session.CreateTextMessage();
+                for(int i = 0; i < MSG_COUNT; ++i)
+                {
+                    message.Text = "Test message " + (i + 1);
+                    producer.Send(message);
+                }
+
+                if (session.Transacted)
+                {
+                    session.Commit();
+                }
+            }
+        }
+
+        public void PurgeQueue(IConnection conn, IDestination queue)
+        {
+            using(ISession session = conn.CreateSession())
+            {
+                using(IMessageConsumer consumer = session.CreateConsumer(queue))
+                while(consumer.Receive(TimeSpan.FromMilliseconds(500)) != null)
+                {
+                }
+            }
+        }
+
+        private void BreakConnection()
+        {
+            TcpTransport transport = this.connection.ITransport.Narrow(typeof(TcpTransport))
as TcpTransport;
+            Assert.IsNotNull(transport);
+            transport.Close();
+        }
+
+        public void FailOnCommitTransportHook(ITransport transport, Command command)
+        {
+            if (commitFailed)
+            {
+                return;
+            }
+
+            if (command is TransactionInfo)
+            {
+                TransactionInfo txInfo = command as TransactionInfo;
+                if (txInfo.Type == (byte)TransactionType.CommitOnePhase)
+                {
+                    Tracer.Debug("Closing the TcpTransport to simulate an connection drop.");
+                    commitFailed = true;
+                    (transport as TcpTransport).Close();
+                }
+            }
+        }
 
     }
 }



Mime
View raw message