activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1197353 - in /activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp: Protocol/StompFrame.cs Protocol/StompFrameStream.cs Protocol/StompWireFormat.cs Transport/InactivityMonitor.cs
Date Thu, 03 Nov 2011 23:45:07 GMT
Author: tabish
Date: Thu Nov  3 23:45:07 2011
New Revision: 1197353

URL: http://svn.apache.org/viewvc?rev=1197353&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQNET-345

Removed:
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompFrameStream.cs
Modified:
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompFrame.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs
    activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/InactivityMonitor.cs

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompFrame.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompFrame.cs?rev=1197353&r1=1197352&r2=1197353&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompFrame.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompFrame.cs
Thu Nov  3 23:45:07 2011
@@ -33,10 +33,18 @@ namespace Apache.NMS.Stomp.Protocol
         public const byte FRAME_TERMINUS = (byte) 0;
         /// Used to denote a Special KeepAlive command that consists of a single newline.
         public const String KEEPALIVE = "KEEPALIVE";
-        
+
+        public const byte BREAK = (byte)('\n');
+        public const byte COLON = (byte)(':');
+        public const byte ESCAPE = (byte)('\\');
+        public readonly byte[] ESCAPE_ESCAPE_SEQ = new byte[2]{ 92, 92 };
+        public readonly byte[] COLON_ESCAPE_SEQ = new byte[2]{ 92, 99 };
+        public readonly byte[] NEWLINE_ESCAPE_SEQ = new byte[2]{ 92, 110 };
+
         private string command;
         private IDictionary properties = new Hashtable();
         private byte[] content;
+        private bool encodingEnabled;
 
         private readonly Encoding encoding = new UTF8Encoding();
         
@@ -44,10 +52,27 @@ namespace Apache.NMS.Stomp.Protocol
         {
         }
 
+        public StompFrame(bool encodingEnabled)
+        {
+            this.encodingEnabled = encodingEnabled;
+        }
+
         public StompFrame(string command)
         {
             this.command = command;
         }
+
+        public StompFrame(string command, bool encodingEnabled)
+        {
+            this.command = command;
+            this.encodingEnabled = encodingEnabled;
+        }
+
+        public bool EncodingEnabled
+        {
+            get { return this.encodingEnabled; }
+            set { this.encodingEnabled = value; }
+        }
         
         public byte[] Content
         {
@@ -152,7 +177,7 @@ namespace Apache.NMS.Stomp.Protocol
             {
                 builder.Append(key);
                 builder.Append(SEPARATOR);
-                builder.Append(this.Properties[key] as string);
+                builder.Append(EncodeHeader(this.Properties[key] as string));
                 builder.Append(NEWLINE);
             }
 
@@ -205,7 +230,7 @@ namespace Apache.NMS.Stomp.Protocol
                     // to store them all but for now we just throw the rest out.
                     if(!this.properties.Contains(key))
                     {
-                        this.properties[key] = value;
+                        this.properties[key] = DecodeHeader(value);
                     }
                 }
                 else
@@ -267,6 +292,86 @@ namespace Apache.NMS.Stomp.Protocol
             
             byte[] data = ms.ToArray();
             return encoding.GetString(data, 0, data.Length);
-        }        
+        }
+
+        private String EncodeHeader(String header)
+        {
+            String result = header;
+            if(this.encodingEnabled)
+            {
+                byte[] utf8buf = this.encoding.GetBytes(header);
+                MemoryStream stream = new MemoryStream(utf8buf.Length);
+                foreach(byte val in utf8buf)
+                {
+                    switch(val)
+                    {
+                    case ESCAPE:
+                        stream.Write(ESCAPE_ESCAPE_SEQ, 0, ESCAPE_ESCAPE_SEQ.Length);
+                        break;
+                    case BREAK:
+                        stream.Write(NEWLINE_ESCAPE_SEQ, 0, NEWLINE_ESCAPE_SEQ.Length);
+                        break;
+                    case COLON:
+                        stream.Write(COLON_ESCAPE_SEQ, 0, COLON_ESCAPE_SEQ.Length);
+                        break;
+                    default:
+                        stream.WriteByte(val);
+                        break;
+                    }
+                }
+
+                byte[] data = stream.ToArray();
+                result = encoding.GetString(data, 0, data.Length);
+            }
+
+            return result;
+        }
+
+        private String DecodeHeader(String header)
+        {
+            MemoryStream decoded = new MemoryStream();
+
+            int value = -1;
+            byte[] utf8buf = this.encoding.GetBytes(header);
+            MemoryStream stream = new MemoryStream(utf8buf);
+
+            while((value = stream.ReadByte()) != -1)
+            {
+                if(value == 92)
+                {
+                    int next = stream.ReadByte();
+                    if (next != -1)
+                    {
+                        switch(next) {
+                        case 110:
+                            decoded.WriteByte(BREAK);
+                            break;
+                        case 99:
+                            decoded.WriteByte(COLON);
+                            break;
+                        case 92:
+                            decoded.WriteByte(ESCAPE);
+                            break;
+                        default:
+                            stream.Seek(-1, SeekOrigin.Current);
+                            decoded.WriteByte((byte)value);
+                            break;
+                        }
+                    }
+                    else
+                    {
+                        decoded.WriteByte((byte)value);
+                    }
+
+                }
+                else
+                {
+                    decoded.WriteByte((byte)value);
+                }
+            }
+
+            byte[] data = decoded.ToArray();
+            return encoding.GetString(data, 0, data.Length);
+        }
     }
 }

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs?rev=1197353&r1=1197352&r2=1197353&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Protocol/StompWireFormat.cs
Thu Nov  3 23:45:07 2011
@@ -33,6 +33,7 @@ namespace Apache.NMS.Stomp.Protocol
         private ITransport transport;
         private WireFormatInfo remoteWireFormatInfo;
         private int connectedResponseId = -1;
+        private bool encodeHeaders = false;
 
         public StompWireFormat()
         {
@@ -116,7 +117,7 @@ namespace Apache.NMS.Stomp.Protocol
 
         public Object Unmarshal(BinaryReader dataIn)
         {            
-            StompFrame frame = new StompFrame();
+            StompFrame frame = new StompFrame(this.encodeHeaders);
             frame.FromStream(dataIn);
             
             Object answer = CreateCommand(frame);
@@ -197,6 +198,11 @@ namespace Apache.NMS.Stomp.Protocol
             {
                 remoteWireFormatInfo.Version = Single.Parse(frame.RemoveProperty("version"));
 
+                if(remoteWireFormatInfo.Version > 1.0f)
+                {
+                    this.encodeHeaders = true;
+                }
+
                 if(frame.HasProperty("session"))
                 {
                     remoteWireFormatInfo.Session = frame.RemoveProperty("session");
@@ -333,7 +339,7 @@ namespace Apache.NMS.Stomp.Protocol
 
         protected virtual void WriteMessage(Message command, BinaryWriter dataOut)
         {
-            StompFrame frame = new StompFrame("SEND");
+            StompFrame frame = new StompFrame("SEND", encodeHeaders);
             if(command.ResponseRequired)
             {
                 frame.SetProperty("receipt", command.CommandId);
@@ -418,7 +424,7 @@ namespace Apache.NMS.Stomp.Protocol
 
         protected virtual void WriteMessageAck(MessageAck command, BinaryWriter dataOut)
         {
-            StompFrame frame = new StompFrame("ACK");
+            StompFrame frame = new StompFrame("ACK", encodeHeaders);
             if(command.ResponseRequired)
             {
                 frame.SetProperty("receipt", "ignore:" + command.CommandId);
@@ -443,8 +449,7 @@ namespace Apache.NMS.Stomp.Protocol
         protected virtual void WriteConnectionInfo(ConnectionInfo command, BinaryWriter dataOut)
         {
             // lets force a receipt for the Connect Frame.
-            
-            StompFrame frame = new StompFrame("CONNECT");
+            StompFrame frame = new StompFrame("CONNECT", encodeHeaders);
 
             frame.SetProperty("client-id", command.ClientId);
             if(!String.IsNullOrEmpty(command.UserName))
@@ -477,7 +482,7 @@ namespace Apache.NMS.Stomp.Protocol
         {
             System.Diagnostics.Debug.Assert(!command.ResponseRequired);
 
-            StompFrame frame = new StompFrame("DISCONNECT");
+            StompFrame frame = new StompFrame("DISCONNECT", encodeHeaders);
 
             if(Tracer.IsDebugEnabled)
             {
@@ -489,7 +494,7 @@ namespace Apache.NMS.Stomp.Protocol
 
         protected virtual void WriteConsumerInfo(ConsumerInfo command, BinaryWriter dataOut)
         {
-            StompFrame frame = new StompFrame("SUBSCRIBE");
+            StompFrame frame = new StompFrame("SUBSCRIBE", encodeHeaders);
 
             if(command.ResponseRequired)
             {
@@ -552,7 +557,7 @@ namespace Apache.NMS.Stomp.Protocol
 
         protected virtual void WriteKeepAliveInfo(KeepAliveInfo command, BinaryWriter dataOut)
         {
-            StompFrame frame = new StompFrame(StompFrame.KEEPALIVE);
+            StompFrame frame = new StompFrame(StompFrame.KEEPALIVE, encodeHeaders);
 
             if(Tracer.IsDebugEnabled)
             {
@@ -564,7 +569,7 @@ namespace Apache.NMS.Stomp.Protocol
 
         protected virtual void WriteRemoveInfo(RemoveInfo command, BinaryWriter dataOut)
         {
-            StompFrame frame = new StompFrame("UNSUBSCRIBE");
+            StompFrame frame = new StompFrame("UNSUBSCRIBE", encodeHeaders);
             object id = command.ObjectId;
 
             if(id is ConsumerId)
@@ -604,7 +609,7 @@ namespace Apache.NMS.Stomp.Protocol
             Tracer.Debug("StompWireFormat - For transaction type: " + transactionType + 
                          " we are using command type: " + type);
 
-            StompFrame frame = new StompFrame(type);
+            StompFrame frame = new StompFrame(type, encodeHeaders);
             if(command.ResponseRequired)
             {
                 frame.SetProperty("receipt", command.CommandId);

Modified: activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/InactivityMonitor.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/InactivityMonitor.cs?rev=1197353&r1=1197352&r2=1197353&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/InactivityMonitor.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.Stomp/trunk/src/main/csharp/Transport/InactivityMonitor.cs
Thu Nov  3 23:45:07 2011
@@ -335,7 +335,8 @@ namespace Apache.NMS.Stomp.Transport
                     this.asyncWriteTask = new AsyncWriteTask(this);
                 }
 
-                initialDelayTime = localWireFormatInfo.MaxInactivityDurationInitialDelay;
+                initialDelayTime = localWireFormatInfo.MaxInactivityDurationInitialDelay
> 0 ?
+                                       localWireFormatInfo.MaxInactivityDurationInitialDelay
: writeCheckTime;
 
                 Tracer.DebugFormat("InactivityMonitor[{0}]: Read Check time interval: {1}",
                                    instanceId, readCheckTime );



Mime
View raw message