activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r835062 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp: ./ Commands/
Date Wed, 11 Nov 2009 20:56:22 GMT
Author: tabish
Date: Wed Nov 11 20:56:21 2009
New Revision: 835062

URL: http://svn.apache.org/viewvc?rev=835062&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQNET-205

Added an ICompressionPolicy to NMS.ActiveMQ and a default implementation that uses the GZipStream
built into .NET

Left it confined to the NMS.ActiveMQ module for now, ICompressionPolicy could be moved to
the NMS module if the other NMS module could actually make use of compression, not sure yet
if that's true or not.

Added:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/CompressionPolicy.cs
  (with props)
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ICompressionPolicy.cs
  (with props)
Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQBytesMessage.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQMapMessage.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQStreamMessage.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQTextMessage.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQBytesMessage.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQBytesMessage.cs?rev=835062&r1=835061&r2=835062&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQBytesMessage.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQBytesMessage.cs
Wed Nov 11 20:56:21 2009
@@ -498,7 +498,7 @@
                     EndianBinaryReader reader = new EndianBinaryReader(target);
                     this.length = reader.ReadInt32();
                     
-                    target = new DeflateStream(target, CompressionMode.Decompress);
+                    target = this.Connection.CompressionPolicy.CreateDecompressionStream(target);
                 }
                 else
                 {
@@ -522,7 +522,7 @@
                     this.length = 0;
 					this.Compressed = true;
 
-                    target = new DeflateStream(target, CompressionMode.Compress);
+                    target = this.Connection.CompressionPolicy.CreateCompressionStream(target);
                   
                     target = new LengthTrackerStream(target, this);
                 }
                 

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQMapMessage.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQMapMessage.cs?rev=835062&r1=835061&r2=835062&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQMapMessage.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQMapMessage.cs
Wed Nov 11 20:56:21 2009
@@ -74,7 +74,7 @@
 
 						if(this.Connection != null && this.Compressed)
 						{
-	                    	source = new DeflateStream(source, CompressionMode.Decompress);
+                            source = this.Connection.CompressionPolicy.CreateDecompressionStream(source);
 						}
 
 						this.body = PrimitiveMap.Unmarshal(source);
@@ -104,8 +104,7 @@
 
                 if(this.Connection != null && this.Connection.UseCompression)
                 {
-                    target = new DeflateStream(target, CompressionMode.Compress);
-					
+                    target = this.Connection.CompressionPolicy.CreateCompressionStream(target);
 					this.Compressed = true;
                 }
                 

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQStreamMessage.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQStreamMessage.cs?rev=835062&r1=835061&r2=835062&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQStreamMessage.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQStreamMessage.cs
Wed Nov 11 20:56:21 2009
@@ -888,7 +888,7 @@
                 Stream target = this.byteBuffer;
                 if(this.Connection != null && this.Compressed == true)
                 {
-                    target = new DeflateStream(this.byteBuffer, CompressionMode.Decompress);
+                    target = this.Connection.CompressionPolicy.CreateDecompressionStream(target);
                 }
                 
 				this.dataIn = new EndianBinaryReader(target);
@@ -901,12 +901,11 @@
 			if(this.dataOut == null)
 			{
                 this.byteBuffer = new MemoryStream();
-
                 Stream target = this.byteBuffer;
+                
                 if(this.Connection != null && this.Connection.UseCompression)
                 {
-                    target = new DeflateStream(this.byteBuffer, CompressionMode.Compress);
-					
+                    target = this.Connection.CompressionPolicy.CreateCompressionStream(target);
 					this.Compressed = true;
                 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQTextMessage.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQTextMessage.cs?rev=835062&r1=835061&r2=835062&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQTextMessage.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ActiveMQTextMessage.cs
Wed Nov 11 20:56:21 2009
@@ -75,7 +75,7 @@
 
                         if(this.Connection != null && this.Compressed == true)
                         {
-                            stream = new DeflateStream(stream, CompressionMode.Decompress);
+                            stream = this.Connection.CompressionPolicy.CreateDecompressionStream(stream);
                           
                         }
                         
     					EndianBinaryReader reader = new EndianBinaryReader(stream);
@@ -113,8 +113,7 @@
 				
                 if(this.Connection != null && this.Connection.UseCompression)
                 {
-                    target = new DeflateStream(target, CompressionMode.Compress);
-					
+                    target = this.Connection.CompressionPolicy.CreateCompressionStream(target);
                           
 					this.Compressed = true;
                 }
                 

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/CompressionPolicy.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/CompressionPolicy.cs?rev=835062&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/CompressionPolicy.cs
(added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/CompressionPolicy.cs
Wed Nov 11 20:56:21 2009
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.IO;
+using System.IO.Compression;
+
+namespace Apache.NMS.ActiveMQ
+{
+    /// <summary>
+    /// Default Compression policy for NMS.ActiveMQ uses the built in GZipStream
+    /// to compress and decompress the message body.  This is not compatible with
+    /// compression used by ActiveMQ so users should use this with caution.
+    /// </summary>
+    public class CompressionPolicy : ICompressionPolicy
+    {
+        public Stream CreateCompressionStream(Stream data)
+        {
+            return new GZipStream(data, CompressionMode.Compress);
+        }
+
+        public Stream CreateDecompressionStream(Stream data)
+        {
+            return new GZipStream(data, CompressionMode.Decompress);
+        }
+
+        public object Clone()
+        {
+            return this.MemberwiseClone();
+        }
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/CompressionPolicy.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=835062&r1=835061&r2=835062&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs Wed Nov
11 20:56:21 2009
@@ -58,6 +58,7 @@
         private bool disposed = false;
         private IRedeliveryPolicy redeliveryPolicy;
         private PrefetchPolicy prefetchPolicy = new PrefetchPolicy();
+        private ICompressionPolicy compressionPolicy = new CompressionPolicy();
 
         public Connection(Uri connectionUri, ITransport transport, ConnectionInfo info)
         {
@@ -267,6 +268,12 @@
             set { this.prefetchPolicy = value; }
         }
 
+        public ICompressionPolicy CompressionPolicy
+        {
+            get { return this.compressionPolicy; }
+            set { this.compressionPolicy = value; }
+        }
+
         #endregion
 
         /// <summary>

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs?rev=835062&r1=835061&r2=835062&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs
Wed Nov 11 20:56:21 2009
@@ -41,6 +41,7 @@
         
         private IRedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
         private PrefetchPolicy prefetchPolicy = new PrefetchPolicy();
+        private ICompressionPolicy compressionPolicy = new CompressionPolicy();
 
 		static ConnectionFactory()
 		{
@@ -103,6 +104,7 @@
             connection.UseCompression = this.useCompression;
             connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as IRedeliveryPolicy;
             connection.PrefetchPolicy = this.prefetchPolicy.Clone() as PrefetchPolicy;
+            connection.CompressionPolicy = this.compressionPolicy.Clone() as ICompressionPolicy;
             
 			// Set properties on connection using parameters prefixed with "connection."
 			// Since this could be a composite Uri, assume the connection-specific parameters
@@ -162,6 +164,18 @@
             }
         }
 
+        public ICompressionPolicy CompressionPolicy
+        {
+            get { return this.compressionPolicy; }
+            set 
+            {
+                if(value != null)
+                {
+                    this.compressionPolicy = value; 
+                }
+            }
+        }        
+
 		public event ExceptionListener OnException
 		{
 			add { onException += value; }

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ICompressionPolicy.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ICompressionPolicy.cs?rev=835062&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ICompressionPolicy.cs
(added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ICompressionPolicy.cs
Wed Nov 11 20:56:21 2009
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.IO;
+
+namespace Apache.NMS.ActiveMQ
+{
+    /// <summary>
+    /// Policy interface for Message Compression, the policy should return
+    /// a new Stream for compression or decompression upon request that wraps
+    /// the provided Stream instance.
+    /// </summary>
+    public interface ICompressionPolicy : ICloneable
+    {
+        Stream CreateCompressionStream(Stream data);
+
+        Stream CreateDecompressionStream(Stream data);
+    }
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ICompressionPolicy.cs
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message