activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r892942 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src: main/csharp/Transport/ main/csharp/Transport/Mock/ test/csharp/Transport/Inactivity/
Date Mon, 21 Dec 2009 19:48:53 GMT
Author: tabish
Date: Mon Dec 21 19:48:53 2009
New Revision: 892942

URL: http://svn.apache.org/viewvc?rev=892942&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQNET-212

Add unit tests for the inactivity monitor transport and update the MockTransport as needed
for the tests.  Fixed an issue in the InactivityMonitor where the commandReceived flag wasn't
getting cleared.

Added:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/Inactivity/
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/Inactivity/InactivityMonitorTest.cs
  (with props)
Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/InactivityMonitor.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/InactivityMonitor.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/InactivityMonitor.cs?rev=892942&r1=892941&r2=892942&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/InactivityMonitor.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/InactivityMonitor.cs
Mon Dec 21 19:48:53 2009
@@ -47,12 +47,30 @@
         private ReadChecker readChecker;
 
         private long readCheckTime;
+        public long ReadCheckTime
+        {
+            get { return this.readCheckTime; }
+            set { this.readCheckTime = value; }
+        }
+
         private long writeCheckTime;
+        public long WriteCheckTime
+        {
+            get { return this.writeCheckTime; }
+            set { this.writeCheckTime = value; }
+        }
+
         private long initialDelayTime;
+        public long InitialDelayTime
+        {
+            get { return this.initialDelayTime; }
+            set { this.initialDelayTime = value; }
+        }
 
         private Atomic<bool> keepAliveResponseRequired = new Atomic<bool>(false);
         public bool KeepAliveResponseRequired
         {
+            get { return this.keepAliveResponseRequired.Value; }
             set { keepAliveResponseRequired.Value = value; }
         }
 
@@ -76,11 +94,12 @@
         /// </summary>
         public void WriteCheck()
         {
-            if (inWrite.Value)
+            if(inWrite.Value)
             {
                 return;
             }
-            if (!commandSent.Value)
+
+            if(!commandSent.Value)
             {
                 Tracer.Debug("No Message sent since last write check. Sending a KeepAliveInfo");
                 ThreadPool.QueueUserWorkItem(new WaitCallback(SendKeepAlive));
@@ -89,12 +108,13 @@
             {
                 Tracer.Debug("Message sent since last write check. Resetting flag");
             }
+
             commandSent.Value = false;
         }
 
         private void SendKeepAlive(object state)
         {
-            if (monitorStarted.Value)
+            if(monitorStarted.Value)
             {
                 try
                 {
@@ -102,7 +122,7 @@
                     info.ResponseRequired = keepAliveResponseRequired.Value;
                     Oneway(info);
                 }
-                catch (IOException exception)
+                catch(IOException exception)
                 {
                     OnException(this, exception);
                 }
@@ -113,19 +133,20 @@
         #region ReadCheck Related
         public void ReadCheck()
         {
-            if (inRead.Value)
+            if(inRead.Value)
             {
                 Tracer.Debug("A receive is in progress");
                 return;
             }
-            if (!commandReceived.Value)
+
+            if(!commandReceived.Value)
             {
                 Tracer.Debug("No message received since last read check! Sending an InactivityException!");
                 ThreadPool.QueueUserWorkItem(new WaitCallback(SendInactivityException));
             }
             else
             {
-                commandReceived.Value = true;
+                commandReceived.Value = false;
             }
         }
 
@@ -158,32 +179,32 @@
             inRead.Value = true;
             try
             {
-                if (command is KeepAliveInfo)
+                if(command is KeepAliveInfo)
                 {
                     KeepAliveInfo info = command as KeepAliveInfo;
-                    if (info.ResponseRequired)
+                    if(info.ResponseRequired)
                     {
                         try
                         {
                             info.ResponseRequired = false;
                             Oneway(info);
                         }
-                        catch (IOException ex)
+                        catch(IOException ex)
                         {
                             OnException(this, ex);
                         }
                     }
                 }
-                else if (command is WireFormatInfo)
+                else if(command is WireFormatInfo)
                 {
-                    lock (monitor)
+                    lock(monitor)
                     {
                         remoteWireFormatInfo = command as WireFormatInfo;
                         try
                         {
                             StartMonitorThreads();
                         }
-                        catch (IOException ex)
+                        catch(IOException ex)
                         {
                             OnException(this, ex);
                         }
@@ -203,18 +224,18 @@
             //synchronize this method - its not synchronized
             //further down the transport stack and gets called by more 
             //than one thread  by this class
-            lock (inWrite)
+            lock(inWrite)
             {
                 inWrite.Value = true;
                 try
                 {
-                    if (failed.Value)
+                    if(failed.Value)
                     {
                         throw new IOException("Channel was inactive for too long: " + next.RemoteAddress.ToString());
                     }
-                    if (command is WireFormatInfo)
+                    if(command is WireFormatInfo)
                     {
-                        lock (monitor)
+                        lock(monitor)
                         {
                             localWireFormatInfo = command as WireFormatInfo;
                             StartMonitorThreads();
@@ -232,7 +253,7 @@
 
         protected override void OnException(ITransport sender, Exception command)
         {
-            if (failed.CompareAndSet(false, true))
+            if(failed.CompareAndSet(false, true))
             {
                 Tracer.Debug("Exception received in the Inactivity Monitor: " + command.ToString());
                 StopMonitorThreads();
@@ -242,17 +263,17 @@
 
         private void StartMonitorThreads()
         {
-            lock (monitor)
+            lock(monitor)
             {
-                if (monitorStarted.Value)
+                if(monitorStarted.Value)
                 {
                     return;
                 }
-                if (localWireFormatInfo == null)
+                if(localWireFormatInfo == null)
                 {
                     return;
                 }
-                if (remoteWireFormatInfo == null)
+                if(remoteWireFormatInfo == null)
                 {
                     return;
                 }
@@ -266,7 +287,7 @@
                         localWireFormatInfo.MaxInactivityDurationInitialDelay,
                         remoteWireFormatInfo.MaxInactivityDurationInitialDelay);
 
-                if (readCheckTime > 0)
+                if(readCheckTime > 0)
                 {
                     monitorStarted.Value = true;
                     writeChecker = new WriteChecker(this);
@@ -292,9 +313,9 @@
 
         private void StopMonitorThreads()
         {
-            lock (monitor)
+            lock(monitor)
             {
-                if (monitorStarted.CompareAndSet(true, false))
+                if(monitorStarted.CompareAndSet(true, false))
                 {
                     readCheckTimer.Dispose();
                     writeCheckTimer.Dispose();
@@ -309,12 +330,14 @@
 
         public WriteChecker(InactivityMonitor parent)
         {
-            if (parent == null)
+            if(parent == null)
             {
                 throw new NullReferenceException("WriteChecker created with a NULL parent.");
             }
+
             this.parent = parent;
         }
+
         public void Check(object state)
         {
             this.parent.WriteCheck();
@@ -328,17 +351,18 @@
 
         public ReadChecker(InactivityMonitor parent)
         {
-            if (parent == null)
+            if(parent == null)
             {
                 throw new NullReferenceException("ReadChecker created with a null parent");
             }
             this.parent = parent;
         }
+
         public void Check(object state)
         {
             long now = DateUtils.ToJavaTimeUtc(DateTime.UtcNow);
             long elapsed = now - lastRunTime;
-            if (!parent.AllowReadCheck(elapsed))
+            if(!parent.AllowReadCheck(elapsed))
             {
                 return;
             }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs?rev=892942&r1=892941&r2=892942&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs
Mon Dec 21 19:48:53 2009
@@ -44,6 +44,9 @@
         private bool failOnReceiveMessage = false;
         private int numReceivedMessagesBeforeFail = 0;
         private int numReceivedMessages = 0;
+        private bool failOnKeepAliveInfoSends = false;
+        private int numSentKeepAliveInfosBeforeFail = 0;
+        private int numSentKeppAliveInfos = 0;
         private int nextCommandId = 0;
         private CommandHandler commandHandler;
         private CommandHandler outgoingCommandHandler;
@@ -150,16 +153,25 @@
         public void Oneway(Command command)
         {
             Tracer.Debug("MockTransport sending oneway Command: " + command.ToString() );
-            
+
             if( command.IsMessage ) {
                 this.numSentMessages++;
-    
+
                 if( this.failOnSendMessage && this.numSentMessages > this.numSentMessagesBeforeFail
) {
                     Tracer.Debug("MockTransport Oneway send, failing as per configuration."
);
                     throw new IOException( "Failed to Send Message.");
                 }
             }
-    
+
+            if( command.IsKeepAliveInfo ) {
+                this.numSentKeppAliveInfos++;
+
+                if( this.failOnKeepAliveInfoSends && this.numSentKeppAliveInfos >
this.numSentKeepAliveInfosBeforeFail ) {
+                    Tracer.Debug("MockTransport Oneway send, failing as per configuration."
);
+                    throw new IOException( "Failed to Send Message.");
+                }
+            }
+            
             // Process and send any new Commands back.
 
             // Let the Response Builder give us the Commands to send to the Client App.
@@ -324,19 +336,37 @@
 			get { return failOnReceiveMessage; }
 			set { failOnReceiveMessage = value; }
 		}
-		
+
 		public int NumReceivedMessagesBeforeFail
 		{
 			get { return numReceivedMessagesBeforeFail; }
-			set { numReceivedMessagesBeforeFail = value; }			
+			set { numReceivedMessagesBeforeFail = value; }
 		}
-		
+
 		public int NumReceivedMessages
 		{
 			get { return numReceivedMessages; }
 			set { numReceivedMessages = value; }
 		}
 
+        public bool FailOnKeepAliveInfoSends
+        {
+            get { return failOnKeepAliveInfoSends; }
+            set { failOnKeepAliveInfoSends = value; }
+        }
+
+        public int NumSentKeepAliveInfosBeforeFail
+        {
+            get { return numSentKeepAliveInfosBeforeFail; }
+            set { numSentKeepAliveInfosBeforeFail = value; }
+        }
+
+        public int NumSentKeppAliveInfos
+        {
+            get { return numSentKeppAliveInfos; }
+            set { numSentKeppAliveInfos = value; }
+        }
+
         public bool IsFaultTolerant
         {
             get{ return false; }

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/Inactivity/InactivityMonitorTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/Inactivity/InactivityMonitorTest.cs?rev=892942&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/Inactivity/InactivityMonitorTest.cs
(added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/Inactivity/InactivityMonitorTest.cs
Mon Dec 21 19:48:53 2009
@@ -0,0 +1,155 @@
+// /*
+//  * Licensed to the Apache Software Foundation (ASF) under one or more
+//  * contributor license agreements.  See the NOTICE file distributed with
+//  * this work for additional information regarding copyright ownership.
+//  * The ASF licenses this file to You under the Apache License, Version 2.0
+//  * (the "License"); you may not use this file except in compliance with
+//  * the License.  You may obtain a copy of the License at
+//  *
+//  *     http://www.apache.org/licenses/LICENSE-2.0
+//  *
+//  * Unless required by applicable law or agreed to in writing, software
+//  * distributed under the License is distributed on an "AS IS" BASIS,
+//  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  * See the License for the specific language governing permissions and
+//  * limitations under the License.
+//  */
+// 
+
+using System;
+using System.Threading;
+using System.Collections;
+using System.Collections.Generic;
+using Apache.NMS;
+using Apache.NMS.ActiveMQ.Transport;
+using Apache.NMS.ActiveMQ.Transport.Mock;
+using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.ActiveMQ.OpenWire;
+using NUnit.Framework;
+using NUnit.Framework.Extensions;
+
+namespace Apache.NMS.ActiveMQ.Test
+{
+    [TestFixture]
+    public class InactivityMonitorTest
+    {
+        private List<Command> received;
+        private List<Exception> exceptions;
+        private MockTransport transport = null;
+        private WireFormatInfo localWireFormatInfo = null;
+
+        public void OnException(ITransport transport, Exception exception)
+        {
+            Tracer.Debug("Test: Received Exception from Transport: " + exception );
+            exceptions.Add( exception );
+        }
+
+        public void OnCommand(ITransport transport, Command command)
+        {
+            Tracer.Debug("Test: Received Command from Transport: " + command );
+            received.Add( command );
+        }
+
+        [SetUp]
+        public void SetUp()
+        {
+            this.received = new List<Command>();
+            this.exceptions = new List<Exception>();
+
+            Uri uri = new Uri("mock://mock?wireformat=openwire");
+            MockTransportFactory factory = new MockTransportFactory();
+
+            this.transport = factory.CompositeConnect( uri ) as MockTransport;
+
+            this.localWireFormatInfo = new WireFormatInfo();
+
+            this.localWireFormatInfo.Version = 5;
+            this.localWireFormatInfo.MaxInactivityDuration = 3000;
+            this.localWireFormatInfo.TightEncodingEnabled = false;
+        }
+
+        [Test]
+        public void TestCreate()
+        {
+            InactivityMonitor monitor = new InactivityMonitor( this.transport );
+
+            Assert.IsTrue( monitor.InitialDelayTime == 0 );
+            Assert.IsTrue( monitor.ReadCheckTime == 0 );
+            Assert.IsTrue( monitor.WriteCheckTime == 0 );
+            Assert.IsTrue( monitor.KeepAliveResponseRequired == false );
+            Assert.IsTrue( monitor.IsDisposed == false );
+        }
+
+        [Test]
+        public void TestReadTimeout()
+        {
+            InactivityMonitor monitor = new InactivityMonitor( this.transport );
+
+            monitor.Exception += new ExceptionHandler(OnException);
+
+            // Send the local one for the monitor to record.
+            monitor.Oneway( this.localWireFormatInfo );
+
+            Thread.Sleep( 2000 );
+
+            // Should not have timed out on Read yet.
+            Assert.IsTrue( this.exceptions.Count == 0 );
+
+            Thread.Sleep( 5000 );
+
+            // Channel should have been inactive for to long.
+            Assert.IsTrue( this.exceptions.Count > 0 );
+        }
+
+        [Test]
+        public void TestWriteMessageFail()
+        {
+            this.transport.FailOnKeepAliveInfoSends = true ;
+            this.transport.NumSentKeepAliveInfosBeforeFail = 4;
+
+            InactivityMonitor monitor = new InactivityMonitor( this.transport );
+
+            monitor.Exception += new ExceptionHandler(OnException);
+
+            // Send the local one for the monitor to record.
+            monitor.Oneway( this.localWireFormatInfo );
+
+            Thread.Sleep( 2000 );
+
+            ActiveMQMessage message = new ActiveMQMessage();
+            this.transport.InjectCommand( message );
+
+            // Should not have timed out on Read yet.
+            Assert.IsTrue( this.exceptions.Count == 0 );
+
+            Thread.Sleep( 8000 );
+
+            // Channel should have been inactive for to long.
+            Assert.IsTrue( this.exceptions.Count > 0 );
+        }
+
+        [Test]
+        public void TestNonFailureSendCase()
+        {
+            InactivityMonitor monitor = new InactivityMonitor( this.transport );
+
+            monitor.Exception += new ExceptionHandler(OnException);
+
+            // Send the local one for the monitor to record.
+            monitor.Oneway( this.localWireFormatInfo );
+
+            ActiveMQMessage message = new ActiveMQMessage();
+            for( int ix = 0; ix < 20; ++ix )
+            {
+                monitor.Oneway( message );
+                Thread.Sleep( 500 );
+                this.transport.InjectCommand( message );
+                Thread.Sleep( 500 );
+            }
+
+            // Channel should have been inactive for to long.
+            Assert.IsTrue( this.exceptions.Count == 0 );
+        }
+
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/Inactivity/InactivityMonitorTest.cs
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message