activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1031862 - in /activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp: ./ Commands/ Protocol/ Transport/
Date Fri, 05 Nov 2010 22:31:25 GMT
Author: tabish
Date: Fri Nov  5 22:31:25 2010
New Revision: 1031862

URL: http://svn.apache.org/viewvc?rev=1031862&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQNET-284

Initial changes for Stomp v1.1 adds the proper headers to the CONNECT frame to activate 1.1
on brokers that support it, and reads off the headers from CONNECTED when 1.1 is indicated.
 Adds the Read check to the current Inactivity monitor.  Handles multiple header values by
keeping only the first one read and discarding the rest.

Added:
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/WireFormatInfo.cs
  (with props)
Modified:
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/BaseCommand.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/Command.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/ConnectionInfo.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/DataStructureTypes.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompFrame.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ITransport.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/InactivityMonitor.cs

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/BaseCommand.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/BaseCommand.cs?rev=1031862&r1=1031861&r2=1031862&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/BaseCommand.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/BaseCommand.cs
Fri Nov  5 22:31:25 2010
@@ -209,6 +209,14 @@ namespace Apache.NMS.Stomp.Commands
             }
         }
 
+        public virtual bool IsWireFormatInfo
+        {
+            get
+            {
+                return false;
+            }
+        }
+
         public virtual bool ResponseRequired
         {
             get

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/Command.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/Command.cs?rev=1031862&r1=1031861&r2=1031862&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/Command.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/Command.cs Fri
Nov  5 22:31:25 2010
@@ -92,6 +92,11 @@ namespace Apache.NMS.Stomp.Commands
             get;
         }
 
+        bool IsWireFormatInfo
+        {
+            get;
+        }
+
         Response visit(ICommandVisitor visitor);
     }
 }

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/ConnectionInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/ConnectionInfo.cs?rev=1031862&r1=1031861&r2=1031862&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/ConnectionInfo.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/ConnectionInfo.cs
Fri Nov  5 22:31:25 2010
@@ -22,31 +22,34 @@ namespace Apache.NMS.Stomp.Commands
     public class ConnectionInfo : BaseCommand
     {
         ConnectionId connectionId;
+        string host;
         string clientId;
         string password;
         string userName;
 
-        ///
+        long maxInactivityDuration = 30000;
+        long maxInactivityDurationInitialDelay = 10000;
+
         /// <summery>
-        ///  Get the unique identifier that this object and its own
-        ///  Marshaler share.
+        /// Get the unique identifier that this object and its own
+        /// Marshaler share.
         /// </summery>
-        ///
         public override byte GetDataStructureType()
         {
             return DataStructureTypes.ConnectionInfoType;
         }
 
-        ///
         /// <summery>
-        ///  Returns a string containing the information for this DataStructure
-        ///  such as its type and value of its elements.
+        /// Returns a string containing the information for this DataStructure
+        /// such as its type and value of its elements.
         /// </summery>
-        ///
         public override string ToString()
         {
             return GetType().Name + "[" +
                 "ConnectionId=" + ConnectionId + ", " +
+                "Host=" + Host + ", " +
+                "MaxInactivityDuration=" + MaxInactivityDuration + ", " +
+                "MaxInactivityDurationInitialDelay=" + MaxInactivityDurationInitialDelay
+ ", " +
                 "ClientId=" + ClientId + ", " +
                 "Password=" + Password + ", " +
                 "UserName=" + UserName +
@@ -59,6 +62,12 @@ namespace Apache.NMS.Stomp.Commands
             set { this.connectionId = value; }
         }
 
+        public string Host
+        {
+            get { return host; }
+            set { this.host = value; }
+        }
+
         public string ClientId
         {
             get { return clientId; }
@@ -77,11 +86,31 @@ namespace Apache.NMS.Stomp.Commands
             set { this.userName = value; }
         }
 
-        ///
+        public long MaxInactivityDuration
+        {
+            get { return this.maxInactivityDuration; }
+            set { this.maxInactivityDuration = value; }
+        }
+
+        public long MaxInactivityDurationInitialDelay
+        {
+            get { return this.maxInactivityDurationInitialDelay; }
+            set { this.maxInactivityDurationInitialDelay = value; }
+        }
+
+        public long ReadCheckInterval
+        {
+            get { return this.MaxInactivityDuration; }
+        }
+
+        public long WriteCheckInterval
+        {
+            get { return maxInactivityDuration > 3 ? maxInactivityDuration / 3 : maxInactivityDuration;
}
+        }
+
         /// <summery>
-        ///  Return an answer of true to the isConnectionInfo() query.
+        /// Return an answer of true to the isConnectionInfo() query.
         /// </summery>
-        ///
         public override bool IsConnectionInfo
         {
             get

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/DataStructureTypes.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/DataStructureTypes.cs?rev=1031862&r1=1031861&r2=1031862&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/DataStructureTypes.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/DataStructureTypes.cs
Fri Nov  5 22:31:25 2010
@@ -50,7 +50,8 @@ namespace Apache.NMS.Stomp.Commands
         public const byte RemoveSubscriptionInfoType = 26;
         public const byte ErrorResponseType = 27;
         public const byte KeepAliveInfoType = 28;
-        
+        public const byte WireFormatInfoType = 29;
+
         public const byte DestinationType = 48;
         public const byte TempDestinationType = 49;
         public const byte TopicType = 50;
@@ -132,6 +133,9 @@ namespace Apache.NMS.Stomp.Commands
             case KeepAliveInfoType:
                 packetTypeStr = "KeepAliveInfoType";
                 break;
+            case WireFormatInfoType:
+                packetTypeStr = "WireFormatInfoType";
+                break;
             case DestinationType:
                 packetTypeStr = "DestinationType";
                 break;

Added: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/WireFormatInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/WireFormatInfo.cs?rev=1031862&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/WireFormatInfo.cs
(added)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/WireFormatInfo.cs
Fri Nov  5 22:31:25 2010
@@ -0,0 +1,99 @@
+// /*
+//  * Licensed to the Apache Software Foundation (ASF) under one or more
+//  * contributor license agreements.  See the NOTICE file distributed with
+//  * this work for additional information regarding copyright ownership.
+//  * The ASF licenses this file to You under the Apache License, Version 2.0
+//  * (the "License"); you may not use this file except in compliance with
+//  * the License.  You may obtain a copy of the License at
+//  *
+//  *     http://www.apache.org/licenses/LICENSE-2.0
+//  *
+//  * Unless required by applicable law or agreed to in writing, software
+//  * distributed under the License is distributed on an "AS IS" BASIS,
+//  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  * See the License for the specific language governing permissions and
+//  * limitations under the License.
+//  */
+// 
+using System;
+
+using Apache.NMS.Stomp.State;
+
+namespace Apache.NMS.Stomp.Commands
+{
+    public class WireFormatInfo : BaseCommand
+    {
+        private long writeCheckInterval = 0;
+        private long readCheckInterval = 0;
+        private float version = 1.0f;
+        private string session;
+
+        public WireFormatInfo()
+        {
+        }
+
+        public long WriteCheckInterval
+        {
+            get { return this.writeCheckInterval; }
+            set { this.writeCheckInterval = value; }
+        }
+
+        public long ReadCheckInterval
+        {
+            get { return this.readCheckInterval; }
+            set { this.readCheckInterval = value; }
+        }
+
+        public float Version
+        {
+            get { return this.version; }
+            set { this.version = value; }
+        }
+
+        public string Session
+        {
+            get { return this.session; }
+            set { this.session = value; }
+        }
+
+        /// <summery>
+        /// Get the unique identifier that this object and its own
+        /// Marshaler share.
+        /// </summery>
+        public override byte GetDataStructureType()
+        {
+            return DataStructureTypes.TransactionInfoType;
+        }
+
+        /// <summery>
+        /// Returns a string containing the information for this DataStructure
+        /// such as its type and value of its elements.
+        /// </summery>
+        public override string ToString()
+        {
+            return GetType().Name + "[" +
+                "WriteCheckInterval=" + WriteCheckInterval + ", " +
+                "ReadCheckInterval=" + ReadCheckInterval + ", " +
+                "Session=" + Session + ", " +
+                "Version=" + Version +
+                "]";
+        }
+
+        /// <summery>
+        /// Return an answer of true to the IsWireFormatInfo() query.
+        /// </summery>
+        public override bool IsWireFormatInfo
+        {
+            get
+            {
+                return true;
+            }
+        }
+
+        public override Response visit(ICommandVisitor visitor)
+        {
+            return null;
+        }
+    }
+}
+

Propchange: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Commands/WireFormatInfo.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs?rev=1031862&r1=1031861&r2=1031862&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Connection.cs Fri Nov
 5 22:31:25 2010
@@ -563,14 +563,17 @@ namespace Apache.NMS.Stomp
         /// <param name="command">A  Command</param>
         protected void OnCommand(ITransport commandTransport, Command command)
         {
-            if(command is MessageDispatch)
+            if(command.IsMessageDispatch)
             {
                 // We wait if the Connection is still processing interruption
                 // code to reset the MessageConsumers.
                 WaitForTransportInterruptionProcessingToComplete();
                 DispatchMessage((MessageDispatch) command);
             }
-            else if(command is ConnectionError)
+            else if(command.IsWireFormatInfo)
+            {
+            }
+            else if(command.IsErrorCommand)
             {
                 if(!closing && !closed)
                 {

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompFrame.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompFrame.cs?rev=1031862&r1=1031861&r2=1031862&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompFrame.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompFrame.cs
Fri Nov  5 22:31:25 2010
@@ -166,8 +166,14 @@ namespace Apache.NMS.Stomp.Protocol
                 {
                     string key = line.Substring(0, idx);
                     string value = line.Substring(idx + 1);
-                    
-                    this.properties[key] = value;
+
+                    // Stomp v1.1+ allows multiple copies of a property, the first
+                    // one is considered to be the newest, we could figure out how
+                    // to store them all but for now we just throw the rest out.
+                    if(!this.properties.Contains(value))
+                    {
+                        this.properties[key] = value;
+                    }
                 }
                 else
                 {

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs?rev=1031862&r1=1031861&r2=1031862&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs
Fri Nov  5 22:31:25 2010
@@ -31,6 +31,7 @@ namespace Apache.NMS.Stomp.Protocol
         private Encoding encoder = new UTF8Encoding();
         private IPrimitiveMapMarshaler mapMarshaler = new XmlPrimitiveMapMarshaler();
         private ITransport transport;
+        private WireFormatInfo remoteWireFormatInfo;
 
         public StompWireFormat()
         {
@@ -125,7 +126,7 @@ namespace Apache.NMS.Stomp.Protocol
         {
             string command = frame.Command;
             
-            if(command == "RECEIPT" || command == "CONNECTED")
+            if(command == "RECEIPT")
             {
                 string text = frame.RemoveProperty("receipt-id");
                 if(text != null)
@@ -141,19 +142,11 @@ namespace Apache.NMS.Stomp.Protocol
                     answer.CorrelationId = Int32.Parse(text);
                     return answer;
                 }
-                else if(command == "CONNECTED")
-                {
-                    text = frame.RemoveProperty("response-id");
-
-                    Tracer.Debug("StompWireFormat - Received CONNECTED command: ResponseId
= " + text);
-                    
-                    if(text != null)
-                    {
-                        Response answer = new Response();
-                        answer.CorrelationId = Int32.Parse(text);
-                        return answer;
-                    }
-                }
+            }
+            else if(command == "CONNECTED")
+            {
+                Tracer.Debug("StompWireFormat - Received CONNECTED command");
+                return ReadConnected(frame);
             }
             else if(command == "ERROR")
             {
@@ -192,6 +185,50 @@ namespace Apache.NMS.Stomp.Protocol
             return null;
         }
 
+        protected virtual Command ReadConnected(StompFrame frame)
+        {
+            string responseId = frame.RemoveProperty("response-id");
+
+            this.remoteWireFormatInfo = new WireFormatInfo();
+
+            if(frame.HasProperty("version"))
+            {
+                remoteWireFormatInfo.Version = Int32.Parse(frame.RemoveProperty("version"));
+
+                if(frame.HasProperty("session"))
+                {
+                    remoteWireFormatInfo.Session = frame.RemoveProperty("session");
+                }
+
+                if(frame.HasProperty("heart-beat"))
+                {
+                    string[] hearBeats = frame.RemoveProperty("heart-beat").Split(",".ToCharArray());
+                    if(hearBeats.Length != 2)
+                    {
+                        throw new IOException("Malformed heartbeat property in Connected
Frame.");
+                    }
+
+                    remoteWireFormatInfo.WriteCheckInterval = Int32.Parse(hearBeats[0].Trim());
+                    remoteWireFormatInfo.ReadCheckInterval = Int32.Parse(hearBeats[1].Trim());
+                }
+            }
+            else
+            {
+                remoteWireFormatInfo.ReadCheckInterval = 0;
+                remoteWireFormatInfo.WriteCheckInterval = 0;
+                remoteWireFormatInfo.Version = 1.0f;
+            }
+
+            if(responseId != null)
+            {
+                Response answer = new Response();
+                answer.CorrelationId = Int32.Parse(responseId);
+                SendCommand(answer);
+            }
+
+            return remoteWireFormatInfo;
+        }
+
         protected virtual Command ReadMessage(StompFrame frame)
         {
             Message message = null;
@@ -400,6 +437,13 @@ namespace Apache.NMS.Stomp.Protocol
             frame.SetProperty("login", command.UserName);
             frame.SetProperty("passcode", command.Password);
             frame.SetProperty("request-id", command.CommandId);
+            frame.SetProperty("host", command.Host);
+            frame.SetProperty("accept-version", "1.0,1.1");
+
+            if(command.MaxInactivityDuration != 0)
+            {
+                frame.SetProperty("heart-beat", command.WriteCheckInterval + "," + command.ReadCheckInterval);
+            }
 
             frame.ToStream(dataOut);
         }

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ITransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ITransport.cs?rev=1031862&r1=1031861&r2=1031862&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ITransport.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/ITransport.cs
Fri Nov  5 22:31:25 2010
@@ -48,39 +48,18 @@ namespace Apache.NMS.Stomp.Transport
         /// wait for the response, instead a FutureResponse object is returned that the
         /// caller can use to wait on the Broker's response.
         /// </summary>
-        /// <param name="command">
-        /// A <see cref="Command"/>
-        /// </param>
-        /// <returns>
-        /// A <see cref="FutureResponse"/>
-        /// </returns>
 		FutureResponse AsyncRequest(Command command);
 
         /// <summary>
         /// Sends a Command to the Broker and waits for a Response to that Command before
         /// returning, this version waits indefinitely for a response.
         /// </summary>
-        /// <param name="command">
-        /// A <see cref="Command"/>
-        /// </param>
-        /// <returns>
-        /// A <see cref="Response"/>
-        /// </returns>
 		Response Request(Command command);
 
         /// <summary>
         /// Sends a Command to the Broker and waits for the given TimeSpan to expire for
a
         /// response before returning.  
         /// </summary>
-        /// <param name="command">
-        /// A <see cref="Command"/>
-        /// </param>
-        /// <param name="timeout">
-        /// A <see cref="TimeSpan"/>
-        /// </param>
-        /// <returns>
-        /// A <see cref="Response"/>
-        /// </returns>
 		Response Request(Command command, TimeSpan timeout);
 
         /// <summary>
@@ -89,12 +68,6 @@ namespace Apache.NMS.Stomp.Transport
         /// object in the Transport chain and set or get properties on that specific
         /// instance.  If the requested type isn't in the chain than Null is returned.
         /// </summary>
-        /// <param name="type">
-        /// A <see cref="Type"/>
-        /// </param>
-        /// <returns>
-        /// A <see cref="System.Object"/>
-        /// </returns>
         Object Narrow(Type type);            
 
 		CommandHandler Command

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/InactivityMonitor.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/InactivityMonitor.cs?rev=1031862&r1=1031861&r2=1031862&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/InactivityMonitor.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/InactivityMonitor.cs
Fri Nov  5 22:31:25 2010
@@ -19,7 +19,6 @@ using System;
 using System.Threading;
 using Apache.NMS.Stomp.Commands;
 using Apache.NMS.Stomp.Threads;
-using Apache.NMS.Stomp.Util;
 using Apache.NMS.Util;
 
 namespace Apache.NMS.Stomp.Transport
@@ -40,27 +39,41 @@ namespace Apache.NMS.Stomp.Transport
         private readonly Atomic<bool> inRead = new Atomic<bool>(false);
         private readonly Atomic<bool> inWrite = new Atomic<bool>(false);
 
-        private DedicatedTaskRunner asyncTask;
+        private CompositeTaskRunner asyncTasks;
+        private AsyncSignalReadErrorkTask asyncErrorTask;
         private AsyncWriteTask asyncWriteTask;
 
         private readonly Mutex monitor = new Mutex();
 
         private Timer connectionCheckTimer;
 
-        private long maxInactivityDuration = 10000;
-        public long MaxInactivityDuration
+        private DateTime lastReadCheckTime;
+
+        private long readCheckTime;
+        public long ReadCheckTime
+        {
+            get { return this.readCheckTime; }
+            set { this.readCheckTime = value; }
+        }
+
+        private long writeCheckTime;
+        public long WriteCheckTime
         {
-            get { return this.maxInactivityDuration; }
-            set { this.maxInactivityDuration = value; }
+            get { return this.writeCheckTime; }
+            set { this.writeCheckTime = value; }
         }
 
-        private long maxInactivityDurationInitialDelay = 10000;
-        public long MaxInactivityDurationInitialDelay
+        private long initialDelayTime;
+        public long InitialDelayTime
         {
-            get { return this.maxInactivityDurationInitialDelay; }
-            set { this.maxInactivityDurationInitialDelay = value; }
+            get { return this.initialDelayTime; }
+            set { this.initialDelayTime = value; }
         }
 
+        // Local and remote Wire Format Information
+        private ConnectionInfo localWireFormatInfo;
+        private WireFormatInfo remoteWireFormatInfo;
+
         /// <summary>
         /// Constructor or the Inactivity Monitor
         /// </summary>
@@ -87,23 +100,34 @@ namespace Apache.NMS.Stomp.Transport
 
             base.Dispose(disposing);
         }
+        
+        public void CheckConnection(object state)
+        {
+            // First see if we have written or can write.
+            WriteCheck();
+            
+            // Now check is we've read anything, if not then we send
+            // a new KeepAlive with response required.
+            ReadCheck();
+        }
 
         #region WriteCheck Related
         /// <summary>
         /// Check the write to the broker
         /// </summary>
-        public void WriteCheck(object unused)
+        public void WriteCheck()
         {
             if(this.inWrite.Value || this.failed.Value)
             {
-                Tracer.Debug("Inactivity Monitor is in write or already failed.");
+                Tracer.Debug("Inactivity Monitor is in write or already failed.");      
       
                 return;
             }
 
             if(!commandSent.Value)
             {
                 Tracer.Debug("No Message sent since last write check. Sending a KeepAliveInfo");
-                this.asyncTask.Wakeup();
+                this.asyncWriteTask.IsPending = true;
+                this.asyncTasks.Wakeup();
             }
             else
             {
@@ -114,6 +138,50 @@ namespace Apache.NMS.Stomp.Transport
         }
         #endregion
 
+        #region ReadCheck Related
+        public void ReadCheck()
+        {
+            DateTime now = DateTime.Now;
+            TimeSpan elapsed = now - this.lastReadCheckTime;
+
+            if(!AllowReadCheck(elapsed))
+            {
+                Tracer.Debug("Inactivity Monitor: A read check is not currently allowed.");
            
+                return;
+            }
+
+            this.lastReadCheckTime = now;
+
+            if(this.inRead.Value || this.failed.Value)
+            {
+                Tracer.Debug("A receive is in progress or already failed.");
+                return;
+            }
+
+            if(!commandReceived.Value)
+            {
+                Tracer.Debug("No message received since last read check! Sending an InactivityException!");
+                this.asyncErrorTask.IsPending = true;
+                this.asyncTasks.Wakeup();
+            }
+            else
+            {
+                commandReceived.Value = false;
+            }
+        }
+
+        /// <summary>
+        /// Checks if we should allow the read check(if less than 90% of the read
+        /// check time elapsed then we dont do the readcheck
+        /// </summary>
+        /// <param name="elapsed"></param>
+        /// <returns></returns>
+        public bool AllowReadCheck(TimeSpan elapsed)
+        {
+            return (elapsed.TotalMilliseconds > (readCheckTime * 9 / 10));
+        }
+        #endregion
+
         public override void Stop()
         {
             StopMonitorThreads();
@@ -126,15 +194,21 @@ namespace Apache.NMS.Stomp.Transport
             inRead.Value = true;
             try
             {
-                try
-                {
-                    StartMonitorThreads();
-                }
-                catch(IOException ex)
+                if(command is WireFormatInfo)
                 {
-                    OnException(this, ex);
+                    lock(monitor)
+                    {
+                        remoteWireFormatInfo = command as WireFormatInfo;
+                        try
+                        {
+                            StartMonitorThreads();
+                        }
+                        catch(IOException ex)
+                        {
+                            OnException(this, ex);
+                        }
+                    }
                 }
-
                 base.OnCommand(sender, command);
             }
             finally
@@ -146,9 +220,9 @@ namespace Apache.NMS.Stomp.Transport
         public override void Oneway(Command command)
         {
             // Disable inactivity monitoring while processing a command.
-            // synchronize this method - its not synchronized
-            // further down the transport stack and gets called by more
-            // than one thread  by this class
+            //synchronize this method - its not synchronized
+            //further down the transport stack and gets called by more
+            //than one thread  by this class
             lock(inWrite)
             {
                 inWrite.Value = true;
@@ -158,7 +232,14 @@ namespace Apache.NMS.Stomp.Transport
                     {
                         throw new IOException("Channel was inactive for too long: " + next.RemoteAddress.ToString());
                     }
-
+                    if(command.IsConnectionInfo)
+                    {
+                        lock(monitor)
+                        {
+                            localWireFormatInfo = command as ConnectionInfo;
+                            StartMonitorThreads();
+                        }
+                    }
                     next.Oneway(command);
                 }
                 finally
@@ -183,25 +264,77 @@ namespace Apache.NMS.Stomp.Transport
         {
             lock(monitor)
             {
-                if(monitorStarted.Value || maxInactivityDuration == 0)
+                if(monitorStarted.Value)
                 {
                     return;
                 }
 
-                Tracer.DebugFormat("Inactivity: Write Check time interval: {0}", maxInactivityDuration
);
-				Tracer.DebugFormat("Inactivity: Initial Delay time interval: {0}", maxInactivityDurationInitialDelay
);
+                if(localWireFormatInfo == null)
+                {
+                    return;
+                }
 
-                this.asyncWriteTask = new AsyncWriteTask(this);
-                this.asyncTask = new DedicatedTaskRunner(this.asyncWriteTask);
+                if(remoteWireFormatInfo == null)
+                {
+                    return;
+                }
+
+                if(localWireFormatInfo.MaxInactivityDuration != 0 &&
+                   remoteWireFormatInfo.WriteCheckInterval != 0)
+                {
+                    readCheckTime =
+                        Math.Max(
+                            localWireFormatInfo.ReadCheckInterval,
+                            remoteWireFormatInfo.WriteCheckInterval);
+
+                    this.asyncErrorTask = new AsyncSignalReadErrorkTask(this, next.RemoteAddress);
+                }
+
+                if(localWireFormatInfo.MaxInactivityDuration != 0)
+                {
+                    if(remoteWireFormatInfo.Version > 1.0)
+                    {
+                        writeCheckTime =
+                            Math.Max(localWireFormatInfo.WriteCheckInterval,
+                                     remoteWireFormatInfo.ReadCheckInterval);
+                    }
+                    else
+                    {
+                        writeCheckTime = localWireFormatInfo.MaxInactivityDuration;
+                    }
 
-                monitorStarted.Value = true;
+                    this.asyncWriteTask = new AsyncWriteTask(this);
+                }
+
+                initialDelayTime = localWireFormatInfo.MaxInactivityDurationInitialDelay;
+                
+                Tracer.DebugFormat("Inactivity: Read Check time interval: {0}", readCheckTime
);
+                Tracer.DebugFormat("Inactivity: Initial Delay time interval: {0}", initialDelayTime
);
+                Tracer.DebugFormat("Inactivity: Write Check time interval: {0}", writeCheckTime
);
+
+                this.asyncTasks = new CompositeTaskRunner();
+
+                if(this.asyncErrorTask != null)
+                {
+                    this.asyncTasks.AddTask(this.asyncErrorTask);
+                }
+
+                if(this.asyncWriteTask != null)
+                {
+                    this.asyncTasks.AddTask(this.asyncWriteTask);
+                }
+
+                if(this.asyncErrorTask != null || this.asyncWriteTask != null)
+                {
+                    monitorStarted.Value = true;
 
-                this.connectionCheckTimer = new Timer(
-                    new TimerCallback(WriteCheck),
-                    null,
-                    maxInactivityDurationInitialDelay,
-                    maxInactivityDuration
-                    );
+                    this.connectionCheckTimer = new Timer(
+                        new TimerCallback(CheckConnection),
+                        null,
+                        initialDelayTime,
+                        writeCheckTime
+                        );
+                }
             }
         }
 
@@ -211,37 +344,78 @@ namespace Apache.NMS.Stomp.Transport
             {
                 if(monitorStarted.CompareAndSet(true, false))
                 {
+                    AutoResetEvent shutdownEvent = new AutoResetEvent(false);
+
                     // Attempt to wait for the Timer to shutdown, but don't wait
                     // forever, if they don't shutdown after two seconds, just quit.
-                    ThreadUtil.DisposeTimer(connectionCheckTimer, 2000);
+                    this.connectionCheckTimer.Dispose(shutdownEvent);
+                    shutdownEvent.WaitOne(TimeSpan.FromMilliseconds(2000), false);
 
-                    this.asyncTask.Shutdown();
-                    this.asyncTask = null;
+                    this.asyncTasks.Shutdown();
+                    this.asyncTasks = null;
                     this.asyncWriteTask = null;
+                    this.asyncErrorTask = null;
                 }
             }
         }
 
         #region Async Tasks
+        // Task that fires when the TaskRunner is signaled by the ReadCheck Timer Task.
+        class AsyncSignalReadErrorkTask : CompositeTask
+        {
+            private readonly InactivityMonitor parent;
+            private readonly Uri remote;
+            private readonly Atomic<bool> pending = new Atomic<bool>(false);
+
+            public AsyncSignalReadErrorkTask(InactivityMonitor parent, Uri remote)
+            {
+                this.parent = parent;
+                this.remote = remote;
+            }
+
+            public bool IsPending
+            {
+                get { return this.pending.Value; }
+                set { this.pending.Value = value; }
+            }
+
+            public bool Iterate()
+            {
+                if(this.pending.CompareAndSet(true, false) && this.parent.monitorStarted.Value)
+                {
+                    IOException ex = new IOException("Channel was inactive for too long:
" + remote);
+                    this.parent.OnException(parent, ex);
+                }
+
+                return this.pending.Value;
+            }
+        }
 
         // Task that fires when the TaskRunner is signaled by the WriteCheck Timer Task.
-        class AsyncWriteTask : Task
+        class AsyncWriteTask : CompositeTask
         {
             private readonly InactivityMonitor parent;
+            private readonly Atomic<bool> pending = new Atomic<bool>(false);
 
             public AsyncWriteTask(InactivityMonitor parent)
             {
                 this.parent = parent;
             }
 
+            public bool IsPending
+            {
+                get { return this.pending.Value; }
+                set { this.pending.Value = value; }
+            }
+
             public bool Iterate()
             {
-				Tracer.Debug("AsyncWriteTask perparing for another Write Check");
-                if(this.parent.monitorStarted.Value)
+                Tracer.Debug("AsyncWriteTask perparing for another Write Check");
+                if(this.pending.CompareAndSet(true, false) && this.parent.monitorStarted.Value)
                 {
                     try
                     {
-						Tracer.Debug("AsyncWriteTask Write Check required sending KeepAlive.");
+                        Tracer.Debug("AsyncWriteTask Write Check required sending KeepAlive.");
                         KeepAliveInfo info = new KeepAliveInfo();
                         this.parent.Oneway(info);
                     }
@@ -251,7 +425,7 @@ namespace Apache.NMS.Stomp.Transport
                     }
                 }
 
-                return false;
+                return this.pending.Value;
             }
         }
         #endregion



Mime
View raw message