activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r508414 - in /activemq/activemq-dotnet/trunk: ./ src/main/csharp/ActiveMQ/ src/main/csharp/ActiveMQ/Commands/ src/main/csharp/ActiveMQ/OpenWire/ src/main/csharp/ActiveMQ/Transport/ src/main/csharp/ActiveMQ/Transport/Stomp/ src/main/csharp/A...
Date Fri, 16 Feb 2007 14:22:27 GMT
Author: jstrachan
Date: Fri Feb 16 06:22:26 2007
New Revision: 508414

URL: http://svn.apache.org/viewvc?view=rev&rev=508414
Log:
Nearly working STOMP support for NMS

Added:
    activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompFrameStream.cs
    activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompHelper.cs
    activemq/activemq-dotnet/trunk/src/sandbox/
    activemq/activemq-dotnet/trunk/src/sandbox/alternative-threading/
    activemq/activemq-dotnet/trunk/src/sandbox/alternative-threading/CountDownLatch2.cs
    activemq/activemq-dotnet/trunk/src/sandbox/alternative-threading/Dispatcher.cs
    activemq/activemq-dotnet/trunk/src/sandbox/alternative-threading/DispatchingThread.cs
    activemq/activemq-dotnet/trunk/src/sandbox/alternative-threading/EventSemaphore.cs
    activemq/activemq-dotnet/trunk/src/sandbox/alternative-threading/FutureResponse.cs   (with props)
    activemq/activemq-dotnet/trunk/src/test/csharp/NMS/Test/NMSPropertyTest.cs
    activemq/activemq-dotnet/trunk/src/test/csharp/Stomp/StompHelperTest.cs
Removed:
    activemq/activemq-dotnet/trunk/src/test/csharp/NMS/Test/JMSPropertyTest.cs
Modified:
    activemq/activemq-dotnet/trunk/nant.build
    activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/BrokerException.cs
    activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Commands/ActiveMQTextMessage.cs
    activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/CommonAssemblyInfo.cs
    activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/OpenWireFormat.cs
    activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/IWireFormat.cs
    activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs
    activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransportFactory.cs
    activemq/activemq-dotnet/trunk/src/main/csharp/NMS/CommonAssemblyInfo.cs
    activemq/activemq-dotnet/trunk/src/test/csharp/ActiveMQ/CommonAssemblyInfo.cs
    activemq/activemq-dotnet/trunk/src/test/csharp/NMS/Test/CommonAssemblyInfo.cs
    activemq/activemq-dotnet/trunk/src/test/csharp/Stomp/CommonAssemblyInfo.cs
    activemq/activemq-dotnet/trunk/src/test/csharp/Stomp/JMSPropertyTest.cs

Modified: activemq/activemq-dotnet/trunk/nant.build
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/nant.build?view=diff&rev=508414&r1=508413&r2=508414
==============================================================================
--- activemq/activemq-dotnet/trunk/nant.build (original)
+++ activemq/activemq-dotnet/trunk/nant.build Fri Feb 16 06:22:26 2007
@@ -117,7 +117,7 @@
   <!-- ============================================================================================ -->
 
   <target name="build" 
-          depends="build-nms,build-nms-test,build-activemq,build-activemq-test,build-msmq,build-msmq-test" 
+          depends="build-nms,build-nms-test,build-activemq,build-activemq-test,build-stomp-test,build-msmq,build-msmq-test" 
           description="Build everything"/>
 
   <!-- Compile the nms module -->

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/BrokerException.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/BrokerException.cs?view=diff&rev=508414&r1=508413&r2=508414
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/BrokerException.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/BrokerException.cs Fri Feb 16 06:22:26 2007
@@ -15,6 +15,7 @@
  * limitations under the License.
  */
 using ActiveMQ.Commands;
+using System.Text;
 using NMS;
 
 namespace ActiveMQ
@@ -25,11 +26,30 @@
 	/// </summary>
 	public class BrokerException : NMSException
     {
-        
         private BrokerError brokerError;
         
+		/// <summary>
+		/// Generates a nice textual stack trace
+		/// </summary>
+		public static string StackTraceDump(StackTraceElement[] elements)
+		{
+			StringBuilder builder = new StringBuilder();
+			if (elements != null) 
+			{
+				foreach (StackTraceElement e in elements) 
+				{
+					builder.Append("\n " + e.ClassName + "." + e.MethodName + "(" + e.FileName + ":" + e.LineNumber + ")");
+				}
+			}
+			return builder.ToString();
+		}
+		
+        public BrokerException() : base("Broker failed with missing exception log")
+        {
+        }
+        
         public BrokerException(BrokerError brokerError) : base(
-            brokerError.ExceptionClass + " : " + brokerError.Message)
+            brokerError.ExceptionClass + " : " + brokerError.Message + "\n" + StackTraceDump(brokerError.StackTraceElements))
         {
             this.brokerError = brokerError;
         }

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Commands/ActiveMQTextMessage.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Commands/ActiveMQTextMessage.cs?view=diff&rev=508414&r1=508413&r2=508414
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Commands/ActiveMQTextMessage.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Commands/ActiveMQTextMessage.cs Fri Feb 16 06:22:26 2007
@@ -74,6 +74,8 @@
                 byte[] data = null;
                 if (text != null)
                 {
+					// TODO lets make the evaluation of the Content lazy!
+					
 					// TODO assume that the text is ASCII
 					
                     byte[] sizePrefix = System.BitConverter.GetBytes(text.Length);

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/CommonAssemblyInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/CommonAssemblyInfo.cs?view=diff&rev=508414&r1=508413&r2=508414
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/CommonAssemblyInfo.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/CommonAssemblyInfo.cs Fri Feb 16 06:22:26 2007
@@ -1,11 +1,11 @@
-using System;
+using System;
 using System.Reflection;
 using System.Runtime.InteropServices;
 
 // ------------------------------------------------------------------------------
 //  <autogenerated>
 //      This code was generated by a tool.
-//      Mono Runtime Version: 1.1.4322.2032
+//      Mono Runtime Version: 2.0.50727.42
 // 
 //      Changes to this file may cause incorrect behavior and will be lost if 
 //      the code is regenerated.

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/OpenWireFormat.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/OpenWireFormat.cs?view=diff&rev=508414&r1=508413&r2=508414
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/OpenWireFormat.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/OpenWire/OpenWireFormat.cs Fri Feb 16 06:22:26 2007
@@ -39,6 +39,7 @@
         private int minimumVersion=1;
 
         private WireFormatInfo preferedWireFormatInfo = new WireFormatInfo();
+		private ITransport transport;
         
         public OpenWireFormat()
         {
@@ -52,7 +53,12 @@
             dataMarshallers = new BaseDataStreamMarshaller[256];
             Version = 1;
         }
-                
+
+        public ITransport Transport {
+			get { return transport; }
+			set { transport = value; }
+		}
+				
         public bool StackTraceEnabled {
             get { return stackTraceEnabled; }
 			set { stackTraceEnabled = value; }

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/IWireFormat.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/IWireFormat.cs?view=diff&rev=508414&r1=508413&r2=508414
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/IWireFormat.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/IWireFormat.cs Fri Feb 16 06:22:26 2007
@@ -33,6 +33,10 @@
 		/// Unmarshalls the next command object from the stream
 		/// </summary>
         Object Unmarshal(BinaryReader dis);
+
+		ITransport Transport {
+			get; set;
+		}
     }
 }
 

Added: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompFrameStream.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompFrameStream.cs?view=auto&rev=508414
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompFrameStream.cs (added)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompFrameStream.cs Fri Feb 16 06:22:26 2007
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System.Reflection;
+using ActiveMQ.Commands;
+using ActiveMQ.OpenWire.V1;
+using ActiveMQ.Transport;
+using NMS;
+using System;
+using System.Collections;
+using System.IO;
+using System.Text;
+
+namespace ActiveMQ.Transport.Stomp
+{
+    /// <summary>
+    /// A Stream for writing a <a href="http://stomp.codehaus.org/">STOMP</a> Frame
+    /// </summary>
+    public class StompFrameStream
+    {
+		public const String NEWLINE = "\n";
+		public const String SEPARATOR = ":";
+		public const char NULL = (char) 0;
+		
+		private StringBuilder builder = new StringBuilder();
+		private BinaryWriter ds;
+		private byte[] content;
+		private int contentLength = -1;
+		private Encoding encoding;
+		
+		public StompFrameStream(BinaryWriter ds, Encoding encoding)
+		{
+			this.ds = ds;
+			this.encoding = encoding;
+		}
+
+		
+		public byte[] Content
+		{
+			get { return content; }
+			set { content = value; }
+		}
+		
+		public int ContentLength 
+		{
+			get { return contentLength; }
+			set 
+			{
+				contentLength = value;
+				WriteHeader("content-length", contentLength);
+			}
+		}
+		
+		public void WriteCommand(Command command, String name)
+		{
+			builder.Append(name);
+			builder.Append(NEWLINE);
+			if (command.ResponseRequired)
+			{
+				WriteHeader("receipt", command.CommandId);
+			}
+		}
+		
+		public void WriteHeader(String name, Object value)
+		{
+			if (value != null) {
+				builder.Append(name);
+				builder.Append(SEPARATOR);
+				builder.Append(value);
+				builder.Append(NEWLINE);
+			}
+		}
+		
+		public void Flush()
+		{
+			builder.Append(NEWLINE);
+			ds.Write(encoding.GetBytes(builder.ToString()));
+			
+			if (content != null) 
+			{
+				ds.Write(content);
+			}
+			
+			// if no content length then lets write a null
+			if (contentLength < 0) 
+			{
+				ds.Write(NULL);
+			}
+		}
+
+		
+    }
+}

Added: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompHelper.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompHelper.cs?view=auto&rev=508414
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompHelper.cs (added)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompHelper.cs Fri Feb 16 06:22:26 2007
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System.Reflection;
+using ActiveMQ.Commands;
+using ActiveMQ.OpenWire.V1;
+using ActiveMQ.Transport;
+using NMS;
+using System;
+using System.Collections;
+using System.IO;
+using System.Text;
+
+namespace ActiveMQ.Transport.Stomp
+{
+    /// <summary>
+    /// Some <a href="http://stomp.codehaus.org/">STOMP</a> protocol conversion helper methods.
+    /// </summary>
+    public class StompHelper
+    {
+		public static ActiveMQDestination ToDestination(string text) 
+		{
+			int type = ActiveMQDestination.ACTIVEMQ_QUEUE;
+			if (text.StartsWith("/queue/")) 
+			{
+				text = text.Substring("/queue/".Length);
+			}
+			else if (text.StartsWith("/topic/"))
+			{
+				text = text.Substring("/topic/".Length);
+				type = ActiveMQDestination.ACTIVEMQ_TOPIC;
+			}
+			else if (text.StartsWith("/temptopic/"))
+			{
+				text = text.Substring("/temptopic/".Length);
+				type = ActiveMQDestination.ACTIVEMQ_TEMPORARY_TOPIC;
+			}
+			else if (text.StartsWith("/tempqueue/"))
+			{
+				text = text.Substring("/tempqueue/".Length);
+				type = ActiveMQDestination.ACTIVEMQ_TEMPORARY_QUEUE;
+			}
+			return ActiveMQDestination.CreateDestination(type, text);
+		}
+
+		public static string ToStomp(ActiveMQDestination destination)
+		{
+			if (destination == null)
+			{
+				return null;
+			}
+			else 
+			{
+				switch (destination.GetDestinationType())
+				{
+					case ActiveMQDestination.ACTIVEMQ_TOPIC:
+						return "/topic/" + destination.PhysicalName;
+					
+					case ActiveMQDestination.ACTIVEMQ_TEMPORARY_TOPIC:
+						return "/temptopic/" + destination.PhysicalName;
+					
+					case ActiveMQDestination.ACTIVEMQ_TEMPORARY_QUEUE:
+						return "/tempqueue/" + destination.PhysicalName;
+					
+					default:
+						return "/queue/" + destination.PhysicalName;
+				}
+			}
+		}
+		
+		public static string ToStomp(ConsumerId id)
+		{
+			return id.ConnectionId + ":" + id.SessionId + ":" + id.Value;
+		}
+		
+		public static ConsumerId ToConsumerId(string text)
+		{
+			if (text == null)
+			{
+				return null;
+			}
+			ConsumerId answer = new ConsumerId();
+			int idx = text.LastIndexOf(':');
+			if (idx >= 0) {
+				answer.Value = Int32.Parse(text.Substring(idx + 1));
+				text = text.Substring(0, idx);
+				idx = text.LastIndexOf(':');
+				if (idx >= 0) {
+					answer.SessionId = Int32.Parse(text.Substring(idx + 1));
+					text = text.Substring(0, idx);
+				}
+			}
+			answer.ConnectionId = text;
+			return answer;
+		}
+		
+		public static string ToStomp(ProducerId id)
+		{
+			return id.ConnectionId + ":" + id.SessionId + ":" + id.Value;
+		}
+		
+		public static ProducerId ToProducerId(string text)
+		{
+			if (text == null)
+			{
+				return null;
+			}
+			ProducerId answer = new ProducerId();
+			int idx = text.LastIndexOf(':');
+			if (idx >= 0) {
+				answer.Value = Int32.Parse(text.Substring(idx + 1));
+				text = text.Substring(0, idx);
+				idx = text.LastIndexOf(':');
+				if (idx >= 0) {
+					answer.SessionId = Int32.Parse(text.Substring(idx + 1));
+					text = text.Substring(0, idx);
+				}
+			}
+			answer.ConnectionId = text;
+			return answer;
+		}
+		
+		public static string ToStomp(MessageId id)
+		{
+			return ToStomp(id.ProducerId) + ":" + id.BrokerSequenceId + ":" + id.ProducerSequenceId;
+		}
+		
+		public static MessageId ToMessageId(string text)
+		{
+			if (text == null)
+			{
+				return null;
+			}
+			MessageId answer = new MessageId();
+			int idx = text.LastIndexOf(':');
+			if (idx >= 0) {
+				answer.ProducerSequenceId = Int32.Parse(text.Substring(idx + 1));
+				text = text.Substring(0, idx);
+				idx = text.LastIndexOf(':');
+				if (idx >= 0) {
+					answer.BrokerSequenceId = Int32.Parse(text.Substring(idx + 1));
+					text = text.Substring(0, idx);
+				}
+			}
+			answer.ProducerId = ToProducerId(text);
+			return answer;
+		}
+    }
+}

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs?view=diff&rev=508414&r1=508413&r2=508414
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs Fri Feb 16 06:22:26 2007
@@ -24,33 +24,48 @@
 using System.IO;
 using System.Text;
 
-namespace ActiveMQ.OpenWire
+namespace ActiveMQ.Transport.Stomp
 {
     /// <summary>
     /// Implements the <a href="http://stomp.codehaus.org/">STOMP</a> protocol.
     /// </summary>
     public class StompWireFormat : IWireFormat
     {
-		protected const String NEWLINE = "\n";
-		protected const String SEPARATOR = ":";
-		protected const char NULL = (char) 0;
-		protected Encoding encoding = new UTF8Encoding();
+		private Encoding encoding = new UTF8Encoding();
+		private ITransport transport;
 		
 		public StompWireFormat()
 		{
 		}
 		
+		public ITransport Transport {
+			get { return transport; }
+			set { transport = value; }
+		}
+		
         public int Version {
             get { return 1; }
         }
 
-        public void Marshal(Object o, BinaryWriter ds)
+        public void Marshal(Object o, BinaryWriter binaryWriter)
         {
+			Console.WriteLine("About to marshall command: " + o);
+			//Console.Out.Flush();
+			StompFrameStream ds = new StompFrameStream(binaryWriter, encoding);
+			
 			if (o is ConnectionInfo) 
 			{
 				WriteConnectionInfo((ConnectionInfo) o, ds);
 			}
-			else if (o is ActiveMQMessage)
+/*			else if (o is SessionInfo) 
+			{
+				SessionInfo info = (SessionInfo) o;
+				if (info.ResponseRequired) 
+				{
+					Console.WriteLine("Response required from Session!!");
+				}
+			}
+*/			else if (o is ActiveMQMessage)
 			{
 				WriteMessage((ActiveMQMessage) o, ds);
 			}
@@ -62,6 +77,16 @@
 			{
 				WriteMessageAck((MessageAck) o, ds);
 			}
+			else if (o is Command) 
+			{
+				Command command = o as Command;
+				if (command.ResponseRequired) 
+				{
+					Response response = new Response();
+					response.CorrelationId = command.CommandId; 
+					SendCommand(response);
+				}
+			}
 			else 
 			{
 				Console.WriteLine("Ignored command: " + o);
@@ -77,6 +102,8 @@
 			}
 			while (command == "");
 			
+			Console.WriteLine(">> command: " + command);
+			
 			IDictionary headers = new Hashtable();
 			string line;
 			while((line = socketReader.ReadLine()) != "") 
@@ -87,12 +114,15 @@
 					string key = line.Substring(0, idx);
 					string value = line.Substring(idx + 1);
 					headers[key] = value;
+					
+					Console.WriteLine(">> header: " + key + " = " + value);
 				}
 				else 
 				{
 					// lets ignore this bad header!
 				}
 			}
+			Console.Out.Flush();
 
 			byte[] content = null;
 			string length = ToString(headers["content-length"]);
@@ -112,15 +142,25 @@
 				string text = body.ToString().TrimEnd('\r', '\n');
 				content = encoding.GetBytes(text);
 			}
-			return CreateCommand(command, headers, content);
+			Console.WriteLine(">>>>> read content: " + content.Length);
+			Console.Out.Flush();
+			
+			Object answer = CreateCommand(command, headers, content);
+			Console.WriteLine(">>>>> received: " + answer);
+			Console.Out.Flush();
+			return answer;
         }
 
 		protected Object CreateCommand(string command, IDictionary headers, byte[] content) 
 		{
-			if (command == "RECEIPT")
+			if (command == "RECEIPT" || command == "CONNECTED")
 			{
 				Response answer = new Response();
-				answer.CorrelationId = Int32.Parse(ToString(headers["receipt-id"]));
+				string text = RemoveHeader(headers, "receipt-id");
+				if (text != null) 
+				{
+					answer.CorrelationId = Int32.Parse(text);
+				}
 				return answer; 
 			}
 			else if (command == "MESSAGE")
@@ -130,9 +170,15 @@
 			else if (command == "ERROR")
 			{
 				ExceptionResponse answer = new ExceptionResponse();
+				string text = RemoveHeader(headers, "receipt-id");
+				if (text != null) 
+				{
+					answer.CorrelationId = Int32.Parse(text);
+				}
+				
 				BrokerError error = new BrokerError();
-				error.Message = ToString(headers["message"]);
-				error.ExceptionClass = ToString(headers["exceptionClass"]); // TODO is this the right header?
+				error.Message = RemoveHeader(headers, "message");
+				error.ExceptionClass = RemoveHeader(headers, "exceptionClass"); // TODO is this the right header?
 				answer.Exception = error;
 				return answer; 
 			}
@@ -143,136 +189,168 @@
 			}
 		}
 		
-		protected ActiveMQMessage ReadMessage(string command, IDictionary headers, byte[] content) 
+		protected Command ReadMessage(string command, IDictionary headers, byte[] content) 
 		{
 			ActiveMQMessage message = null;
 			if (headers.Contains("content-length"))
 			{
 				message = new ActiveMQBytesMessage();
+				message.Content = content;
 			}
 			else 
 			{
-				message = new ActiveMQTextMessage();
+				message = new ActiveMQTextMessage(encoding.GetString(content));
 			}
-			message.Content = content;
+
+			Console.WriteLine("Content is: " + content.Length + " byte(s)");
 			
-			// TODO now lets set the various headers
-			message.Type = ToString(headers["type"]);
-			message.Destination = ActiveMQDestination.CreateDestination(ActiveMQDestination.ACTIVEMQ_QUEUE, ToString(headers["destination"]));
-			message.ReplyTo = ActiveMQDestination.CreateDestination(ActiveMQDestination.ACTIVEMQ_QUEUE, ToString(headers["reply-to"]));
-			
-			// lets remove all the standard headers as unfortunately there's no Remove method which returns the removed value
-			string[] standardHeaders = { "type", "destination", "reply-to", "receipt-id" };
-			foreach (string header in standardHeaders) 
+			if (message is ActiveMQTextMessage)
 			{
-				headers.Remove(header);
+				ActiveMQTextMessage textMessage = message as ActiveMQTextMessage;
+				Console.WriteLine("Text is: " + textMessage.Text);
 			}
 			
+			// TODO now lets set the various headers
+			
+			message.Type = RemoveHeader(headers, "type");
+			message.Destination = StompHelper.ToDestination(RemoveHeader(headers, "destination"));
+			message.ReplyTo = StompHelper.ToDestination(RemoveHeader(headers, "reply-to"));
+			message.TargetConsumerId = StompHelper.ToConsumerId(RemoveHeader(headers, "subscription"));
+			message.CorrelationId = ToString(headers["correlation-id"]);
+			message.MessageId = StompHelper.ToMessageId(RemoveHeader(headers, "message-id"));
+			
+			string header = RemoveHeader(headers, "priority");
+			if (header != null) message.Priority = Byte.Parse(header);
+			
+			header = RemoveHeader(headers, "timestamp");
+			if (header != null) message.Timestamp = Int64.Parse(header);
+
+			header = RemoveHeader(headers, "expires");
+			if (header != null) message.Expiration = Int64.Parse(header);
+			
+			header = RemoveHeader(headers, "timestamp");
+			if (header != null) message.Timestamp = Int64.Parse(header);
+			
 			// now lets add the generic headers
 			foreach (string key in headers.Keys)
 			{
 				message.Properties[key] = headers[key];
 			}
-			return message;
+			MessageDispatch dispatch = new MessageDispatch();
+			dispatch.Message = message;
+			dispatch.ConsumerId = message.TargetConsumerId;
+			dispatch.Destination = message.Destination;
+			return dispatch;
 		}
 		
-		protected void WriteConnectionInfo(ConnectionInfo command, BinaryWriter ds)
+		
+		
+		
+		protected void WriteConnectionInfo(ConnectionInfo command, StompFrameStream ss)
 		{
-			WriteCommand(ds, "CONNECT");
-			WriteHeader(ds, "client-id", command.ClientId);
-			WriteHeader(ds, "login", command.UserName);
-			WriteHeader(ds, "passcode", command.Password);
-			WriteCommonHeaders(ds, command);
-			WriteFrameEnd(ds);
+			// lets force a receipt
+			command.ResponseRequired = true;
+			
+			ss.WriteCommand(command, "CONNECT");
+			ss.WriteHeader("client-id", command.ClientId);
+			ss.WriteHeader("login", command.UserName);
+			ss.WriteHeader("passcode", command.Password);
+			ss.Flush();
 		}
 
-		protected void WriteConsumerInfo(ConsumerInfo command, BinaryWriter ds)
+		protected void WriteConsumerInfo(ConsumerInfo command, StompFrameStream ss)
 		{
-			WriteCommand(ds, "SUBSCRIBE");
-			WriteHeader(ds, "destination", command.Destination);
-			WriteHeader(ds, "selector", command.Selector);
-			WriteHeader(ds, "id", command.ConsumerId);
-			WriteHeader(ds, "durable-subscriber-name", command.SubscriptionName);
-			WriteHeader(ds, "no-local", command.NoLocal);
-			WriteHeader(ds, "ack", "client");
-			
+			ss.WriteCommand(command, "SUBSCRIBE");
+			ss.WriteHeader("destination", StompHelper.ToStomp(command.Destination));
+			ss.WriteHeader("selector", command.Selector);
+			ss.WriteHeader("id", StompHelper.ToStomp(command.ConsumerId));
+			ss.WriteHeader("durable-subscriber-name", command.SubscriptionName);
+			ss.WriteHeader("no-local", command.NoLocal);
+			ss.WriteHeader("ack", "client");
+
 			// ActiveMQ extensions to STOMP
-			WriteHeader(ds, "activemq.dispatchAsync", command.DispatchAsync);
-			WriteHeader(ds, "activemq.exclusive", command.Exclusive);
-			WriteHeader(ds, "activemq.maximumPendingMessageLimit", command.MaximumPendingMessageLimit);
-			WriteHeader(ds, "activemq.prefetchSize", command.PrefetchSize);
-			WriteHeader(ds, "activemq.priority ", command.Priority);
-			WriteHeader(ds, "activemq.retroactive", command.Retroactive);
-			
-			WriteCommonHeaders(ds, command);
-			WriteFrameEnd(ds);
-		}
-
-		protected void WriteMessage(ActiveMQMessage command, BinaryWriter ds)
-		{
-			WriteCommand(ds, "SEND");
-			WriteHeader(ds, "correlation-id", command.CorrelationId);
-			WriteHeader(ds, "reply-to", command.ReplyTo);
-			WriteHeader(ds, "expires", command.Expiration);
-			WriteHeader(ds, "priority", command.Priority);
-			WriteHeader(ds, "type", command.Type);
-			WriteHeader(ds, "transaction", command.TransactionId);
+			ss.WriteHeader("activemq.dispatchAsync", command.DispatchAsync);
+			ss.WriteHeader("activemq.exclusive", command.Exclusive);
+			ss.WriteHeader("activemq.maximumPendingMessageLimit", command.MaximumPendingMessageLimit);
+			ss.WriteHeader("activemq.prefetchSize", command.PrefetchSize);
+			ss.WriteHeader("activemq.priority ", command.Priority);
+			ss.WriteHeader("activemq.retroactive", command.Retroactive);
+			
+			ss.Flush();
+		}
+
+		protected void WriteMessage(ActiveMQMessage command, StompFrameStream ss)
+		{
+			ss.WriteCommand(command, "SEND");
+			ss.WriteHeader("destination", StompHelper.ToStomp(command.Destination));
+			ss.WriteHeader("reply-to", StompHelper.ToStomp(command.ReplyTo));
+			ss.WriteHeader("correlation-id", command.CorrelationId);
+			ss.WriteHeader("expires", command.Expiration);
+			ss.WriteHeader("priority", command.Priority);
+			ss.WriteHeader("type", command.Type);
+			ss.WriteHeader("transaction", command.TransactionId);
+			
+			// lets force the content to be marshalled
 			
-			if (!(command is ActiveMQTextMessage)) 
+			command.BeforeMarshall(null);
+			if (command is ActiveMQTextMessage) 
 			{
-				WriteHeader(ds, "content-length", command.Content.Length);
+				ActiveMQTextMessage textMessage = command as ActiveMQTextMessage;
+				ss.Content = encoding.GetBytes(textMessage.Text);
+				
+				Console.WriteLine("============ the text is : " + textMessage.Text + " which is: " + ss.Content.Length + " bytes for: " + command);
+			}
+			else 
+			{
+				ss.Content = command.Content;
+				ss.ContentLength = command.Content.Length;
 			}
 	
-			// TODO write content
-			
 			IPrimitiveMap map = command.Properties;
 			foreach (string key in map.Keys)
 			{
-				WriteHeader(ds, key, map[key]);
+				ss.WriteHeader(key, map[key]);
 			}
-			WriteCommonHeaders(ds, command);
-			ds.Write(NEWLINE);
-			ds.Write(command.Content);
-			ds.Write(NULL);
+			ss.Flush();
 		}
 		
-		protected void WriteMessageAck(MessageAck command, BinaryWriter ds)
+		protected void WriteMessageAck(MessageAck command, StompFrameStream ss)
 		{
-			WriteCommand(ds, "ACK");
+			ss.WriteCommand(command, "ACK");
 			
 			// TODO handle bulk ACKs?
-			WriteHeader(ds, "message-id", command.FirstMessageId);
-			WriteHeader(ds, "transaction", command.TransactionId);
-			
-			WriteCommonHeaders(ds, command);
-			WriteFrameEnd(ds);
-		}
-		
-		protected void WriteCommand(BinaryWriter ds, String command)
-		{
-			ds.Write(command + NEWLINE);
-		}
-		
-		protected void WriteFrameEnd(BinaryWriter ds)
-		{
-			ds.Write(NEWLINE);
-			ds.Write(NULL);
+			ss.WriteHeader("message-id", command.FirstMessageId);
+			ss.WriteHeader("transaction", command.TransactionId);
+
+			ss.Flush();
 		}
 		
-		protected void WriteHeader(BinaryWriter ds, String name, Object value)
+		protected void SendCommand(Command command)
 		{
-			if (value != null) {
-				ds.Write(name + SEPARATOR + value + NEWLINE);
+			if (transport == null)
+			{
+				Console.WriteLine("No transport configured so cannot return command: " + command);
+			}
+			else 
+			{
+				transport.Command(transport, command);
 			}
 		}
 		
-		protected void WriteCommonHeaders(BinaryWriter ds, Command command) 
+		protected string RemoveHeader(IDictionary headers, string name)
 		{
-			if (command.ResponseRequired)
+			object value = headers[name];
+			if (value == null) 
 			{
-				WriteHeader(ds, "receipt", command.CommandId);
+				return null;
+			}
+			else 
+			{
+				headers.Remove(name);
+				return value.ToString();
 			}
 		}
+		
 		
 		protected string ToString(object value) 
 		{

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransportFactory.cs?view=diff&rev=508414&r1=508413&r2=508414
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransportFactory.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransportFactory.cs Fri Feb 16 06:22:26 2007
@@ -21,6 +21,7 @@
 using ActiveMQ.Commands;
 using ActiveMQ.OpenWire;
 using ActiveMQ.Transport;
+using ActiveMQ.Transport.Stomp;
 using ActiveMQ.Util;
 
 namespace ActiveMQ.Transport.Tcp {
@@ -45,6 +46,7 @@
                         Socket socket = Connect(location.Host, location.Port);
 						IWireFormat wireformat = CreateWireFormat(location, map);
                         TcpTransport tcpTransport = new TcpTransport(socket, wireformat);
+						wireformat.Transport = tcpTransport;
                         ITransport rc = tcpTransport;
 
                         if (UseLogging)
@@ -66,7 +68,7 @@
                 {
                         // Looping through the AddressList allows different type of connections to be tried
                         // (IPv4, IPv6 and whatever else may be available).
-                        IPHostEntry hostEntry = Dns.GetHostEntry(host);
+                        IPHostEntry hostEntry = Dns.GetHostByName(host);
                         foreach (IPAddress address in hostEntry.AddressList)
                         {
                                 Socket socket = new Socket(address.AddressFamily, SocketType.Stream, ProtocolType.Tcp);

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/NMS/CommonAssemblyInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/NMS/CommonAssemblyInfo.cs?view=diff&rev=508414&r1=508413&r2=508414
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/NMS/CommonAssemblyInfo.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/NMS/CommonAssemblyInfo.cs Fri Feb 16 06:22:26 2007
@@ -1,11 +1,11 @@
-using System;
+using System;
 using System.Reflection;
 using System.Runtime.InteropServices;
 
 // ------------------------------------------------------------------------------
 //  <autogenerated>
 //      This code was generated by a tool.
-//      Mono Runtime Version: 1.1.4322.2032
+//      Mono Runtime Version: 2.0.50727.42
 // 
 //      Changes to this file may cause incorrect behavior and will be lost if 
 //      the code is regenerated.

Added: activemq/activemq-dotnet/trunk/src/sandbox/alternative-threading/CountDownLatch2.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/sandbox/alternative-threading/CountDownLatch2.cs?view=auto&rev=508414
==============================================================================
--- activemq/activemq-dotnet/trunk/src/sandbox/alternative-threading/CountDownLatch2.cs (added)
+++ activemq/activemq-dotnet/trunk/src/sandbox/alternative-threading/CountDownLatch2.cs Fri Feb 16 06:22:26 2007
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Threading;
+
+namespace ActiveMQ.Util
+{
+	/// <summary>
+	/// An alternative implementation using the EventSemaphore class
+	/// <summary>
+    class CountDownLatch2
+    {
+		readonly EventSemaphore mutex = new EventSemaphore();
+        int remaining;
+        
+        public CountDownLatch2(int i)
+        {
+            remaining=i;
+        }
+
+        public void countDown()
+        {
+            lock(mutex)
+            {
+                if( remaining > 0 ) 
+				{
+                    remaining--;
+                    if( remaining <= 0 )
+                    {
+	                    mutex.PulseAll();
+					}
+                }
+            }	
+        }
+
+        public int Remaining
+        {
+            get { 
+                lock(mutex)
+                {
+                    return remaining;
+                }            
+            }
+        }
+        
+		/// <summary>
+		/// Waits forever for the latch to be completed
+		/// <summary>
+        public bool await()
+        {
+            lock (mutex)
+            {
+				TimeSpan elapsed = new TimeSpan(0, 0, 5);
+                while (remaining > 0)
+                {
+                    mutex.Wait(elapsed);
+                }
+            }
+			return true;
+        }
+        
+		/// <summary>
+		/// Waits the specified amount of time for the latch
+		/// returning true if the latch was acquired
+		/// <summary>
+        public bool await(TimeSpan timeout)
+        {
+			DateTime end = DateTime.Now.Add(timeout);
+            lock (mutex)
+            {
+                while (remaining > 0)
+                {
+					TimeSpan elapsed = end.Subtract(DateTime.Now);
+					if (elapsed.Milliseconds < 0) {
+						break;
+					}
+					Console.WriteLine("About to wait on semaphore for: " + elapsed);
+					
+                    mutex.Wait(elapsed);
+                }
+	            return remaining > 0;
+            }
+        }
+
+        
+		/// <summary>
+		/// Waits the specified amount of time for the latch
+		/// returning true if the latch was acquired
+		/// <summary>
+        public bool await(int millis)
+        {
+			long ticks = millis * (1000000 / 100); // 1,000,000 nanos in a millisecond
+			TimeSpan span = new TimeSpan(ticks);
+			return await(span);
+		}
+    }
+}

Added: activemq/activemq-dotnet/trunk/src/sandbox/alternative-threading/Dispatcher.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/sandbox/alternative-threading/Dispatcher.cs?view=auto&rev=508414
==============================================================================
--- activemq/activemq-dotnet/trunk/src/sandbox/alternative-threading/Dispatcher.cs (added)
+++ activemq/activemq-dotnet/trunk/src/sandbox/alternative-threading/Dispatcher.cs Fri Feb 16 06:22:26 2007
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using ActiveMQ.Commands;
+using ActiveMQ.Util;
+using NMS;
+using System;
+using System.Collections;
+using System.Threading;
+
+namespace ActiveMQ
+{
+	
+	/// <summary>
+	/// Handles the multi-threaded dispatching between the transport and the consumers
+	/// </summary>
+	public class Dispatcher
+    {
+        Queue queue = new Queue();
+        ArrayList messagesToRedeliver = new ArrayList();
+        
+        // TODO can't use EventWaitHandle on MONO 1.0
+        //AutoResetEvent messageReceivedEventHandle = new AutoResetEvent(false);
+		//Object semaphore = new Object();
+		AutoResetEvent messageReceivedEventHandle;
+		readonly EventSemaphore semaphore = new EventSemaphore();
+
+        bool m_bAsyncDelivery = false;
+        bool m_bClosed = false;
+
+		public void SetAsyncDelivery(AutoResetEvent eventHandle)
+		{
+			lock (semaphore)
+			{
+				messageReceivedEventHandle = eventHandle;
+				m_bAsyncDelivery = true;
+				if (queue.Count > 0) 
+				{
+					PulseSemaphore();
+				}
+			}
+		}
+
+        /// <summary>
+        /// Whem we start a transaction we must redeliver any rolled back messages
+        /// </summary>
+        public void RedeliverRolledBackMessages()
+		{
+            lock (semaphore)
+            {
+                Queue replacement = new Queue(queue.Count + messagesToRedeliver.Count);
+                foreach (ActiveMQMessage element in messagesToRedeliver)
+                {
+                    replacement.Enqueue(element);
+                }
+                messagesToRedeliver.Clear();
+                
+                while (queue.Count > 0)
+                {
+                    ActiveMQMessage element = (ActiveMQMessage) queue.Dequeue();
+                    replacement.Enqueue(element);
+                }
+                queue = replacement;
+                if (queue.Count > 0)
+				{
+					PulseSemaphore();
+				}
+            }
+        }
+        
+        /// <summary>
+        /// Redeliver the given message, putting it at the head of the queue
+        /// </summary>
+        public void Redeliver(ActiveMQMessage message)
+        {
+            lock (semaphore)
+			{
+				messagesToRedeliver.Add(message);
+            }
+        }
+        
+        /// <summary>
+        /// Method Enqueue
+        /// </summary>
+        public void Enqueue(ActiveMQMessage message)
+        {
+            lock (semaphore)
+            {
+                queue.Enqueue(message);
+				PulseSemaphore();
+            }
+        }
+        
+        /// <summary>
+        /// Method DequeueNoWait
+        /// </summary>
+        public IMessage DequeueNoWait()
+        {
+            IMessage rc = null;
+            lock (semaphore)
+            {
+                if (!m_bClosed && queue.Count > 0)
+                {
+                    rc = (IMessage) queue.Dequeue();
+                } 
+            }
+            return rc;
+        }
+
+        /// <summary>
+        /// Method Dequeue
+        /// </summary>
+        public IMessage Dequeue(TimeSpan timeout)
+        {
+            IMessage rc;
+			bool bClosed = false;
+			lock (semaphore)
+			{
+				bClosed = m_bClosed;
+				rc = DequeueNoWait();
+			}
+
+            while (!bClosed && rc == null)
+            {
+/*	
+                if( !messageReceivedEventHandle.WaitOne((int)timeout.TotalMilliseconds, false) )
+                {
+                    break;
+                }
+*/
+				if (messageReceivedEventHandle != null) 
+				{
+	                if( !messageReceivedEventHandle.WaitOne(timeout, false) )
+	                {
+	                    break;
+	                }
+				}
+				else
+				{
+					semaphore.Wait(timeout);
+				}
+				lock (semaphore)
+				{
+					rc = DequeueNoWait();
+					bClosed = m_bClosed;
+				}
+            }
+            return rc;
+        }
+        
+        /// <summary>
+        /// Method Dequeue
+        /// </summary>
+        public IMessage Dequeue()
+        {
+			return Dequeue(TimeSpan.MaxValue);
+        }
+
+		internal void Close()
+		{
+			lock (semaphore)
+			{
+				m_bClosed = true;
+				if(m_bAsyncDelivery)
+				{
+					PulseSemaphore();
+				}
+			}
+		}
+
+		protected void PulseSemaphore() 
+		{
+			if (messageReceivedEventHandle != null) 
+			{
+				messageReceivedEventHandle.Set();	
+			}
+			semaphore.PulseAll();
+		}
+	}
+}

Added: activemq/activemq-dotnet/trunk/src/sandbox/alternative-threading/DispatchingThread.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/sandbox/alternative-threading/DispatchingThread.cs?view=auto&rev=508414
==============================================================================
--- activemq/activemq-dotnet/trunk/src/sandbox/alternative-threading/DispatchingThread.cs (added)
+++ activemq/activemq-dotnet/trunk/src/sandbox/alternative-threading/DispatchingThread.cs Fri Feb 16 06:22:26 2007
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using ActiveMQ.Util;
+using System;
+using System.Threading;
+
+
+namespace ActiveMQ
+{
+	internal class DispatchingThread
+	{
+		public delegate void DispatchFunction();
+		public delegate void ExceptionHandler(Exception exception);
+
+		private readonly AutoResetEvent eventHandle = new AutoResetEvent(false);
+		readonly EventSemaphore m_event = new EventSemaphore();
+		
+		private bool m_bStopFlag = false;
+		private Thread m_thread = null;
+		private readonly DispatchFunction m_dispatchFunc;
+		private event ExceptionHandler m_exceptionListener;
+
+		public DispatchingThread(DispatchFunction dispatchFunc)
+		{
+			m_dispatchFunc = dispatchFunc;
+		}
+
+               // TODO can't use EventWaitHandle on MONO 1.0
+		public AutoResetEvent EventHandle
+		{
+			get { return eventHandle; }
+		}
+
+
+		internal event ExceptionHandler ExceptionListener
+		{
+			add
+			{
+				m_exceptionListener += value;
+			}
+			remove 
+			{
+				m_exceptionListener -= value;
+			}
+		}
+
+		internal void Start()
+		{
+			lock (this)
+			{
+				if (m_thread == null)
+				{
+					m_bStopFlag = false;
+					m_thread = new Thread(new ThreadStart(MyThreadFunc));
+					//m_event.Set();
+					m_event.PulseAll();
+					Tracer.Info("Starting dispatcher thread for session");
+					m_thread.Start();
+				}
+			}
+		}
+
+		internal void Stop()
+		{
+			Stop(System.Threading.Timeout.Infinite);
+		}
+
+		
+		internal void Stop(int timeoutMilliseconds)
+		{
+			Tracer.Info("Stopping dispatcher thread for session");
+			Thread localThread = null;
+			lock (this)
+			{
+				localThread = m_thread;
+				m_thread = null;
+				if (!m_bStopFlag)
+				{
+					m_bStopFlag = true;
+					//m_event.Set();
+					m_event.PulseAll();
+				}
+			}
+			if(localThread!=null)
+			{
+				localThread.Join(timeoutMilliseconds);
+			}
+			Tracer.Info("Dispatcher thread joined");
+		}
+		
+		private void MyThreadFunc()
+		{
+			Tracer.Info("Dispatcher thread started");
+			while (true) // loop forever (well, at least until we've been asked to stop)
+			{
+				lock (this)
+				{
+					if (m_bStopFlag)
+						break;
+				}
+
+				try
+				{
+					m_dispatchFunc();
+				}
+				catch (Exception ex)
+				{
+					if (m_exceptionListener != null)
+						m_exceptionListener(ex);
+				}
+				//m_event.WaitOne();
+				m_event.Wait();
+			}
+			Tracer.Info("Dispatcher thread stopped");
+		}
+	}
+}
\ No newline at end of file

Added: activemq/activemq-dotnet/trunk/src/sandbox/alternative-threading/EventSemaphore.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/sandbox/alternative-threading/EventSemaphore.cs?view=auto&rev=508414
==============================================================================
--- activemq/activemq-dotnet/trunk/src/sandbox/alternative-threading/EventSemaphore.cs (added)
+++ activemq/activemq-dotnet/trunk/src/sandbox/alternative-threading/EventSemaphore.cs Fri Feb 16 06:22:26 2007
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Threading;
+
+namespace ActiveMQ.Util
+{
+    class EventSemaphore
+    {
+        //readonly ManualResetEvent mutex = new ManualResetEvent(false);
+
+		public void PulseAll() 
+		{
+            lock(this)
+            {
+                Monitor.PulseAll(this);
+                //mutex.Set();
+            }	
+        }
+
+        public void Wait()
+        {
+            lock (this)
+            {
+                    Monitor.Wait(this);
+            }
+            //return mutex.WaitOne(timeout, false);
+        }
+
+        public void Wait(TimeSpan timeout)
+        {
+            lock (this)
+            {
+                    Monitor.Wait(this, timeout, false);
+            }
+            //return mutex.WaitOne(timeout, false);
+        }
+    }
+}

Added: activemq/activemq-dotnet/trunk/src/sandbox/alternative-threading/FutureResponse.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/sandbox/alternative-threading/FutureResponse.cs?view=auto&rev=508414
==============================================================================
--- activemq/activemq-dotnet/trunk/src/sandbox/alternative-threading/FutureResponse.cs (added)
+++ activemq/activemq-dotnet/trunk/src/sandbox/alternative-threading/FutureResponse.cs Fri Feb 16 06:22:26 2007
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using ActiveMQ.Commands;
+using System;
+using System.Threading;
+using ActiveMQ.Util;
+
+namespace ActiveMQ.Transport
+{
+	
+	/// <summary>
+	/// Handles asynchronous responses
+	/// </summary>
+	public class FutureResponse 
+    {
+	    
+        private static int maxWait = -1;
+
+        private readonly CountDownLatch latch = new CountDownLatch(1);
+        private Response response;
+        
+/*        public WaitHandle AsyncWaitHandle
+        {
+            get { return latch.AsyncWaitHandle; }
+        }        
+*/        
+        public Response Response
+        {
+            // Blocks the caller until a value has been set
+            get {
+                lock (latch)
+                {
+	                while (response == null)
+	                {
+	                    try
+						{
+							if (maxWait > 0) 
+							{
+		                        latch.await(maxWait);
+							}
+							else 
+							{
+		                        latch.await();
+							}
+	                    }
+	                    catch (Exception e)
+						{
+	                        Tracer.Error("Caught while waiting on monitor: " + e);
+	                    }
+	                }
+                    return response;
+                }
+            }
+            
+            set {
+                lock (latch)
+                {
+                    response = value;
+                }
+                latch.countDown();
+            }
+        }
+    }
+}
+

Propchange: activemq/activemq-dotnet/trunk/src/sandbox/alternative-threading/FutureResponse.cs
------------------------------------------------------------------------------
    svn:executable = *

Modified: activemq/activemq-dotnet/trunk/src/test/csharp/ActiveMQ/CommonAssemblyInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/test/csharp/ActiveMQ/CommonAssemblyInfo.cs?view=diff&rev=508414&r1=508413&r2=508414
==============================================================================
--- activemq/activemq-dotnet/trunk/src/test/csharp/ActiveMQ/CommonAssemblyInfo.cs (original)
+++ activemq/activemq-dotnet/trunk/src/test/csharp/ActiveMQ/CommonAssemblyInfo.cs Fri Feb 16 06:22:26 2007
@@ -1,11 +1,11 @@
-using System;
+using System;
 using System.Reflection;
 using System.Runtime.InteropServices;
 
 // ------------------------------------------------------------------------------
 //  <autogenerated>
 //      This code was generated by a tool.
-//      Mono Runtime Version: 1.1.4322.2032
+//      Mono Runtime Version: 2.0.50727.42
 // 
 //      Changes to this file may cause incorrect behavior and will be lost if 
 //      the code is regenerated.

Modified: activemq/activemq-dotnet/trunk/src/test/csharp/NMS/Test/CommonAssemblyInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/test/csharp/NMS/Test/CommonAssemblyInfo.cs?view=diff&rev=508414&r1=508413&r2=508414
==============================================================================
--- activemq/activemq-dotnet/trunk/src/test/csharp/NMS/Test/CommonAssemblyInfo.cs (original)
+++ activemq/activemq-dotnet/trunk/src/test/csharp/NMS/Test/CommonAssemblyInfo.cs Fri Feb 16 06:22:26 2007
@@ -1,11 +1,11 @@
-using System;
+using System;
 using System.Reflection;
 using System.Runtime.InteropServices;
 
 // ------------------------------------------------------------------------------
 //  <autogenerated>
 //      This code was generated by a tool.
-//      Mono Runtime Version: 1.1.4322.2032
+//      Mono Runtime Version: 2.0.50727.42
 // 
 //      Changes to this file may cause incorrect behavior and will be lost if 
 //      the code is regenerated.

Added: activemq/activemq-dotnet/trunk/src/test/csharp/NMS/Test/NMSPropertyTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/test/csharp/NMS/Test/NMSPropertyTest.cs?view=auto&rev=508414
==============================================================================
--- activemq/activemq-dotnet/trunk/src/test/csharp/NMS/Test/NMSPropertyTest.cs (added)
+++ activemq/activemq-dotnet/trunk/src/test/csharp/NMS/Test/NMSPropertyTest.cs Fri Feb 16 06:22:26 2007
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+//using ActiveMQ;
+using NMS;
+using NUnit.Framework;
+using System;
+
+namespace NMS.Test
+{
+    [ TestFixture ]
+    abstract public class NMSPropertyTest : NMSTestSupport
+    {
+        // standard NMS properties
+        string expectedText = "Hey this works!";
+        string correlationID = "abc";
+        ITemporaryQueue replyTo;
+        bool persistent = true;
+        byte priority = 5;
+        String type = "FooType";
+        String groupID = "MyGroup";
+        int groupSeq = 1;
+        
+        // custom properties
+        string customText = "Cheese";
+        bool custom1 = true;
+        byte custom2 = 12;
+        short custom3 = 0x1234;
+        int custom4 = 0x12345678;
+        long custom5 = 0x1234567812345678;
+        char custom6 = 'J';
+        
+        [SetUp]
+        override public void SetUp()
+        {
+            base.SetUp();
+        }
+        
+        [TearDown]
+        override public void TearDown()
+        {
+            base.TearDown();
+        }
+        
+        [ Test ]
+        public override void SendAndSyncReceive()
+        {
+            base.SendAndSyncReceive();
+        }
+        
+        protected override IMessage CreateMessage()
+        {
+            ITextMessage message = Session.CreateTextMessage(expectedText);
+            replyTo = Session.CreateTemporaryQueue();
+            
+            // lets set the headers
+            message.NMSCorrelationID = correlationID;
+            message.NMSReplyTo = replyTo;
+            message.NMSPersistent = persistent;
+            message.NMSPriority = priority;
+            message.NMSType = type;
+            message.Properties["NMSXGroupID"] = groupID;
+            message.Properties["NMSXGroupSeq"] = groupSeq;
+            
+            // lets set the custom headers
+            message.Properties["customText"] = customText;
+            message.Properties["custom1"] = custom1;
+            message.Properties["custom2"] = custom2;
+            message.Properties["custom3"] = custom3;
+            message.Properties["custom4"] = custom4;
+            message.Properties["custom5"] = custom5;
+            message.Properties["custom6"] = custom6;
+            
+            return message;
+        }
+        
+        protected override void AssertValidMessage(IMessage message)
+        {
+            Assert.IsTrue(message is ITextMessage, "Did not receive a ITextMessage!");
+            
+            Console.WriteLine("Received Message: " + message);
+            
+            ITextMessage textMessage = (ITextMessage) message;
+            String text = textMessage.Text;
+            Assert.AreEqual(expectedText, text, "the message text");
+            
+            // compare standard NMS headers
+            Assert.AreEqual(correlationID, message.NMSCorrelationID, "NMSCorrelationID");
+            Assert.AreEqual(replyTo, message.NMSReplyTo, "NMSReplyTo");
+            Assert.AreEqual(persistent, message.NMSPersistent, "NMSPersistent");
+            Assert.AreEqual(priority, message.NMSPriority, "NMSPriority");
+            Assert.AreEqual(type, message.NMSType, "NMSType");
+            Assert.AreEqual(groupID, message.Properties["NMSXGroupID"], "NMSXGroupID");
+            Assert.AreEqual(groupSeq, message.Properties["NMSXGroupSeq"], "NMSXGroupSeq");
+            
+            // compare custom headers
+            Assert.AreEqual(customText, message.Properties["customText"], "customText");
+            Assert.AreEqual(custom1, message.Properties["custom1"], "custom1");
+            Assert.AreEqual(custom2, message.Properties["custom2"], "custom2");
+            Assert.AreEqual(custom3, message.Properties["custom3"], "custom3");
+            Assert.AreEqual(custom4, message.Properties["custom4"], "custom4");
+            // TODO
+            Assert.AreEqual(custom5, message.Properties["custom5"], "custom5");
+            Object value6 = message.Properties["custom6"];
+            Object expected6 = custom6;
+            Console.WriteLine("actual type is: " + value6.GetType() + " value: " + value6);
+            Console.WriteLine("expected type is: " + expected6.GetType() + " value: " + expected6);
+            Assert.AreEqual(custom6, value6, "custom6 which is of type: " + value6.GetType());
+            
+            Assert.AreEqual(custom1, message.Properties.GetBool("custom1"), "custom1");
+            Assert.AreEqual(custom2, message.Properties.GetByte("custom2"), "custom2");
+            Assert.AreEqual(custom3, message.Properties.GetShort("custom3"), "custom3");
+            Assert.AreEqual(custom4, message.Properties.GetInt("custom4"), "custom4");
+            Assert.AreEqual(custom5, message.Properties.GetLong("custom5"), "custom5");
+            //Assert.AreEqual(custom6, message.Properties.GetChar("custom6"), "custom6");
+            
+            // lets now look at some standard NMS headers
+            Console.WriteLine("NMSExpiration: " + message.NMSExpiration);
+            Console.WriteLine("NMSMessageId: " + message.NMSMessageId);
+            Console.WriteLine("NMSRedelivered: " + message.NMSRedelivered);
+            Console.WriteLine("NMSTimestamp: " + message.NMSTimestamp);
+            Console.WriteLine("NMSXDeliveryCount: " + message.Properties["NMSXDeliveryCount"]);
+            Console.WriteLine("NMSXProducerTXID: " + message.Properties["NMSXProducerTXID"]);
+        }
+    }
+}
+
+
+

Modified: activemq/activemq-dotnet/trunk/src/test/csharp/Stomp/CommonAssemblyInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/test/csharp/Stomp/CommonAssemblyInfo.cs?view=diff&rev=508414&r1=508413&r2=508414
==============================================================================
--- activemq/activemq-dotnet/trunk/src/test/csharp/Stomp/CommonAssemblyInfo.cs (original)
+++ activemq/activemq-dotnet/trunk/src/test/csharp/Stomp/CommonAssemblyInfo.cs Fri Feb 16 06:22:26 2007
@@ -1,11 +1,11 @@
-using System;
+using System;
 using System.Reflection;
 using System.Runtime.InteropServices;
 
 // ------------------------------------------------------------------------------
 //  <autogenerated>
 //      This code was generated by a tool.
-//      Mono Runtime Version: 1.1.4322.2032
+//      Mono Runtime Version: 2.0.50727.42
 // 
 //      Changes to this file may cause incorrect behavior and will be lost if 
 //      the code is regenerated.

Modified: activemq/activemq-dotnet/trunk/src/test/csharp/Stomp/JMSPropertyTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/test/csharp/Stomp/JMSPropertyTest.cs?view=diff&rev=508414&r1=508413&r2=508414
==============================================================================
--- activemq/activemq-dotnet/trunk/src/test/csharp/Stomp/JMSPropertyTest.cs (original)
+++ activemq/activemq-dotnet/trunk/src/test/csharp/Stomp/JMSPropertyTest.cs Fri Feb 16 06:22:26 2007
@@ -26,6 +26,8 @@
     {
         protected override IConnectionFactory CreateConnectionFactory()
         {
+			Console.WriteLine("##### using the NMS STOMP client!");
+			
             return new ConnectionFactory();
         }
     }

Added: activemq/activemq-dotnet/trunk/src/test/csharp/Stomp/StompHelperTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/test/csharp/Stomp/StompHelperTest.cs?view=auto&rev=508414
==============================================================================
--- activemq/activemq-dotnet/trunk/src/test/csharp/Stomp/StompHelperTest.cs (added)
+++ activemq/activemq-dotnet/trunk/src/test/csharp/Stomp/StompHelperTest.cs Fri Feb 16 06:22:26 2007
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using NMS;
+using NUnit.Framework;
+using ActiveMQ.Commands;
+using ActiveMQ.Transport.Stomp;
+using System;
+
+namespace Stomp
+{
+    [ TestFixture ]
+    public class StompHelperTest
+    {
+		[ Test ]
+		public void ConsumerIdMarshallingWorks()
+		{
+			ConsumerId id = new ConsumerId();
+			id.ConnectionId = "cheese";
+			id.SessionId = 2;
+			id.Value = 3;
+			
+			string text = StompHelper.ToStomp(id);
+			Assert.AreEqual("cheese:2:3", text, "ConsumerId as stomp");
+			
+			ConsumerId another = StompHelper.ToConsumerId("abc:5:6");
+			Assert.AreEqual("abc", another.ConnectionId, "extracting consumerId.ConnectionId");
+			Assert.AreEqual(5, another.SessionId, "extracting consumerId.SessionId");
+			Assert.AreEqual(6, another.Value, "extracting consumerId.Value");
+		}
+
+		[ Test ]
+		public void MessageIdMarshallingWorks()
+		{
+			ProducerId id = new ProducerId();
+			id.ConnectionId = "cheese";
+			id.SessionId = 2;
+			id.Value = 3;
+			
+			MessageId mid = new MessageId();
+			mid.ProducerId = id;
+			mid.BrokerSequenceId = 5;
+			mid.ProducerSequenceId = 6;
+			
+			string text = StompHelper.ToStomp(mid);
+			Assert.AreEqual("cheese:2:3:5:6", text, "MessageId as stomp");
+			
+			MessageId mid2 = StompHelper.ToMessageId("abc:5:6:7:8");
+			Assert.AreEqual(7, mid2.BrokerSequenceId, "extracting mid2.BrokerSequenceId");
+			Assert.AreEqual(8, mid2.ProducerSequenceId, "extracting mid2.ProducerSequenceId");
+
+			ProducerId another = mid2.ProducerId;
+			Assert.AreEqual("abc", another.ConnectionId, "extracting producerId.ConnectionId");
+			Assert.AreEqual(5, another.SessionId, "extracting producerId.SessionId");
+			Assert.AreEqual(6, another.Value, "extracting producerId.Value");
+		}
+
+		// TODO destination stuff
+    }
+}
+
+
+



Mime
View raw message