activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1040615 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src: main/csharp/ main/csharp/State/ main/csharp/Transport/Failover/ test/csharp/Transport/failover/
Date Tue, 30 Nov 2010 16:25:54 GMT
Author: tabish
Date: Tue Nov 30 16:25:54 2010
New Revision: 1040615

URL: http://svn.apache.org/viewvc?rev=1040615&view=rev
Log:
Some work on: https://issues.apache.org/jira/browse/AMQNET-293

Adds new test in FailoverTransportTest to cover connection drop during OnMessage.

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/failover/FailoverTransportTest.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=1040615&r1=1040614&r2=1040615&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs Tue Nov
30 16:25:54 2010
@@ -842,6 +842,8 @@ namespace Apache.NMS.ActiveMQ
                 Tracer.Debug("transport interrupted, dispatchers: " + dispatchers.Count);
             }
 
+            SignalInterruptionProcessingNeeded();
+
             foreach(Session session in this.sessions)
             {
                 try
@@ -1001,6 +1003,7 @@ namespace Apache.NMS.ActiveMQ
                 {
                     Tracer.Debug("transportInterruptionProcessingComplete for: " + this.info.ConnectionId);
                 }
+
                 this.transportInterruptionProcessingComplete = null;
 
                 FailoverTransport failoverTransport = transport.Narrow(typeof(FailoverTransport))
as FailoverTransport;
@@ -1013,9 +1016,22 @@ namespace Apache.NMS.ActiveMQ
                                      ") of interruption completion for: " + this.info.ConnectionId);
                     }
                 }
-
             }
         }
 
+        private void SignalInterruptionProcessingNeeded()
+        {
+            FailoverTransport failoverTransport = transport.Narrow(typeof(FailoverTransport))
as FailoverTransport;
+
+            if(failoverTransport != null)
+            {
+                failoverTransport.StateTracker.TransportInterrupted(this.info.ConnectionId);
+                if(Tracer.IsDebugEnabled)
+                {
+                    Tracer.Debug("notified failover transport (" + failoverTransport +
+                                 ") of pending interruption processing for: " + this.info.ConnectionId);
+                }
+            }
+        }
     }
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs?rev=1040615&r1=1040614&r2=1040615&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs
Tue Nov 30 16:25:54 2010
@@ -718,11 +718,12 @@ namespace Apache.NMS.ActiveMQ.State
             }
         }
 
-        public void TransportInterrupted()
+        public void TransportInterrupted(ConnectionId id)
         {
-            foreach(ConnectionState connectionState in connectionStates.Values)
+            ConnectionState connection = connectionStates[id];
+            if(connection != null)
             {
-                connectionState.ConnectionInterruptProcessingComplete = false;
+                connection.ConnectionInterruptProcessingComplete = false;
             }
         }
     }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs?rev=1040615&r1=1040614&r2=1040615&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
Tue Nov 30 16:25:54 2010
@@ -292,6 +292,11 @@ namespace Apache.NMS.ActiveMQ.Transport.
 			set { asyncTimeout = value; }
 		}
 
+        public ConnectionStateTracker StateTracker
+        {
+            get { return this.stateTracker; }
+        }
+
 		#endregion
 
 		public bool IsFaultTolerant
@@ -384,8 +389,6 @@ namespace Apache.NMS.ActiveMQ.Transport.
 					ConnectedTransportURI = null;
 					connected = false;
 
-					stateTracker.TransportInterrupted();
-
 					if(this.Interrupted != null)
 					{
 						this.Interrupted(transport);

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/failover/FailoverTransportTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/failover/FailoverTransportTest.cs?rev=1040615&r1=1040614&r2=1040615&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/failover/FailoverTransportTest.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/failover/FailoverTransportTest.cs
Tue Nov 30 16:25:54 2010
@@ -18,9 +18,14 @@
 using System;
 using System.Collections.Generic;
 using System.Threading;
+using Apache.NMS;
+using Apache.NMS.Util;
+using Apache.NMS.Test;
+using Apache.NMS.ActiveMQ;
 using Apache.NMS.ActiveMQ.Commands;
 using Apache.NMS.ActiveMQ.Transport;
 using Apache.NMS.ActiveMQ.Transport.Failover;
+using Apache.NMS.ActiveMQ.Transport.Tcp;
 using Apache.NMS.ActiveMQ.Transport.Mock;
 using NUnit.Framework;
 
@@ -33,7 +38,14 @@ namespace Apache.NMS.ActiveMQ.Test
 		private List<Command> received;
 		private List<Exception> exceptions;
 
-		int sessionIdx = 1;
+        private const int MESSAGE_COUNT = 5;
+        private Connection connection;
+        private int msgCount = 5;
+        private bool interrupted = false;
+        private bool resumed = false;
+        protected AutoResetEvent semaphore = new AutoResetEvent(false);
+
+        int sessionIdx = 1;
 		int consumerIdx = 1;
 		int producerIdx = 1;
 
@@ -85,6 +97,10 @@ namespace Apache.NMS.ActiveMQ.Test
 			sessionIdx = 1;
 			consumerIdx = 1;
 			producerIdx = 1;
+            this.connection = null;
+            this.msgCount = MESSAGE_COUNT;
+            this.interrupted = false;
+            this.resumed = false;
 		}
 
 		[Test]
@@ -633,5 +649,109 @@ namespace Apache.NMS.ActiveMQ.Test
 		{
 			transport.Oneway(new RemoveInfo() { ObjectId = producer.ProducerId });
 		}
+
+        [Test]
+        public void FailoverTransportFailOnProcessingReceivedMessageTest()
+        {
+            string uri = "failover:(tcp://${activemqhost}:61616)";
+            IConnectionFactory factory = new ConnectionFactory(NMSTestSupport.ReplaceEnvVar(uri));
+            using(connection = factory.CreateConnection() as Connection )
+            {
+                connection.ConnectionInterruptedListener +=
+                    new ConnectionInterruptedListener(TransportInterrupted);
+                connection.ConnectionResumedListener +=
+                    new ConnectionResumedListener(TransportResumed);
+
+                connection.Start();
+                using(ISession session = connection.CreateSession())
+                {
+                    IDestination destination = session.GetQueue("Test?consumer.prefetchSize=1");
+                    PurgeQueue(connection, destination);
+                    PutMsgIntoQueue(session, destination);
+
+                    using(IMessageConsumer consumer = session.CreateConsumer(destination))
+                    {
+                        consumer.Listener += OnMessage;
+                        BreakConnection();
+                        WaitForMessagesToArrive();
+                    }
+                }
+            }
+
+            Assert.IsTrue(this.interrupted);
+            Assert.IsTrue(this.resumed);
+        }
+
+        public void TransportInterrupted()
+        {
+            this.interrupted = true;
+        }
+
+        public void TransportResumed()
+        {
+            this.resumed = true;
+        }
+
+        public void OnMessage(IMessage message)
+        {
+            var textMsg = message as ITextMessage;
+
+            if(textMsg == null)
+            {
+                return;
+            }
+
+            msgCount--;
+
+            // just process the first message for 10 seconds to give some time main thread
+            // to restart ActiveMq broker
+            if(msgCount == MESSAGE_COUNT - 1)
+            {
+                Thread.Sleep(10000);
+            }
+
+            if(msgCount == 0)
+            {
+                // if all messages were consumed then we are fine
+                semaphore.Set();
+            }
+        }
+
+        private void PutMsgIntoQueue(ISession session, IDestination destination)
+        {
+            using(IMessageProducer producer = session.CreateProducer(destination))
+            {
+                ITextMessage message = session.CreateTextMessage();
+                for(int i = 0; i < msgCount; ++i)
+                {
+                    message.Text = "Test message " + (i + 1);
+                    producer.Send(message);
+                }
+            }
+        }
+
+        public void PurgeQueue(IConnection conn, IDestination queue)
+        {
+            ISession session = conn.CreateSession();
+            IMessageConsumer consumer = session.CreateConsumer(queue);
+            while(consumer.Receive(TimeSpan.FromMilliseconds(500)) != null)
+            {
+            }
+            consumer.Close();
+            session.Close();
+        }
+
+        private void BreakConnection()
+        {
+            TcpTransport transport = this.connection.ITransport.Narrow(typeof(TcpTransport))
as TcpTransport;
+            Assert.IsNotNull(transport);
+            transport.Close();
+        }
+
+        protected void WaitForMessagesToArrive()
+        {
+            semaphore.WaitOne(30000, true);
+            Assert.AreEqual(0, msgCount);
+        }
 	}
 }



Mime
View raw message