activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r512915 - in /activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp: StompHelper.cs StompWireFormat.cs
Date Wed, 28 Feb 2007 19:26:06 GMT
Author: jstrachan
Date: Wed Feb 28 11:26:05 2007
New Revision: 512915

URL: http://svn.apache.org/viewvc?view=rev&rev=512915
Log:
added support for unsubscribe and transactions; needs more testing though

Modified:
    activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompHelper.cs
    activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompHelper.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompHelper.cs?view=diff&rev=512915&r1=512914&r2=512915
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompHelper.cs
(original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompHelper.cs
Wed Feb 28 11:26:05 2007
@@ -31,10 +31,12 @@
     /// </summary>
     public class StompHelper
     {
-		public static ActiveMQDestination ToDestination(string text) 
+
+
+		public static ActiveMQDestination ToDestination(string text)
 		{
 			int type = ActiveMQDestination.ACTIVEMQ_QUEUE;
-			if (text.StartsWith("/queue/")) 
+			if (text.StartsWith("/queue/"))
 			{
 				text = text.Substring("/queue/".Length);
 			}
@@ -62,7 +64,7 @@
 			{
 				return null;
 			}
-			else 
+			else
 			{
 				switch (destination.DestinationType)
 				{
@@ -158,6 +160,20 @@
 			answer.ProducerId = ToProducerId(text);
 			return answer;
 		}
+	
+		public static string ToStomp(TransactionId id)
+		{
+			if (id is LocalTransactionId)
+			{
+				return ToStomp(id as LocalTransactionId);
+			}
+			return id.ToString();
+		}
+		
+		public static string ToStomp(LocalTransactionId transactionId)
+		{
+			return transactionId.ConnectionId.Value + ":" + transactionId.Value;
+		}
 		
 		public static bool ToBool(string text, bool defaultValue)
 		{
@@ -165,7 +181,7 @@
 			{
 				return defaultValue;
 			}
-			else 
+			else
 			{
 				return "true" == text || "TRUE" == text;
 			}

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs?view=diff&rev=512915&r1=512914&r2=512915
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs
(original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs
Wed Feb 28 11:26:05 2007
@@ -69,6 +69,18 @@
 			{
 				WriteMessageAck((MessageAck) o, ds);
 			}
+			else if (o is TransactionInfo)
+			{
+				WriteTransactionInfo((TransactionInfo) o, ds);
+			}
+			else if (o is ShutdownInfo)
+			{
+				WriteShutdownInfo((ShutdownInfo) o, ds);
+			}
+			else if (o is RemoveInfo)
+			{
+				WriteRemoveInfo((RemoveInfo) o, ds);
+			}
 			else if (o is Command)
 			{
 				Command command = o as Command;
@@ -78,13 +90,15 @@
 					response.CorrelationId = command.CommandId;
 					SendCommand(response);
 				}
+				Console.WriteLine("#### Ignored command: " + o);
 			}
 			else
 			{
-				Console.WriteLine("Ignored command: " + o);
+				Console.WriteLine("#### Ignored command: " + o);
 			}
         }
-        
+
+
         public Object Unmarshal(BinaryReader dis)
         {
 			StreamReader socketReader = new StreamReader(dis.BaseStream);
@@ -138,7 +152,7 @@
 			return answer;
         }
 
-		protected Object CreateCommand(string command, IDictionary headers, byte[] content)
+		protected virtual Object CreateCommand(string command, IDictionary headers, byte[] content)
 		{
 			if (command == "RECEIPT" || command == "CONNECTED")
 			{
@@ -176,7 +190,7 @@
 			}
 		}
 		
-		protected Command ReadMessage(string command, IDictionary headers, byte[] content)
+		protected virtual Command ReadMessage(string command, IDictionary headers, byte[] content)
 		{
 			ActiveMQMessage message = null;
 			if (headers.Contains("content-length"))
@@ -241,7 +255,7 @@
 		
 		
 		
-		protected void WriteConnectionInfo(ConnectionInfo command, StompFrameStream ss)
+		protected virtual void WriteConnectionInfo(ConnectionInfo command, StompFrameStream ss)
 		{
 			// lets force a receipt
 			command.ResponseRequired = true;
@@ -252,8 +266,14 @@
 			ss.WriteHeader("passcode", command.Password);
 			ss.Flush();
 		}
+		
+		protected virtual void WriteShutdownInfo(ShutdownInfo command, StompFrameStream ss)
+		{
+			ss.WriteCommand(command, "DISCONNECT");
+			ss.Flush();
+		}
 
-		protected void WriteConsumerInfo(ConsumerInfo command, StompFrameStream ss)
+		protected virtual void WriteConsumerInfo(ConsumerInfo command, StompFrameStream ss)
 		{
 			ss.WriteCommand(command, "SUBSCRIBE");
 			ss.WriteHeader("destination", StompHelper.ToStomp(command.Destination));
@@ -274,7 +294,49 @@
 			ss.Flush();
 		}
 
-		protected void WriteMessage(ActiveMQMessage command, StompFrameStream ss)
+		protected virtual void WriteRemoveInfo(RemoveInfo command, StompFrameStream ss)
+		{
+			object id = command.ObjectId;
+			if (id is ConsumerId)
+			{
+				ConsumerId consumerId = id as ConsumerId;
+				ss.WriteCommand(command, "UNSUBSCRIBE");
+				ss.WriteHeader("id", StompHelper.ToStomp(consumerId));
+				
+				ss.Flush();
+			}
+		}
+		
+		
+		protected virtual void WriteTransactionInfo(TransactionInfo command, StompFrameStream ss)
+		{
+			TransactionId id = command.TransactionId;
+			if (id is LocalTransactionId)
+			{
+				string type = "BEGIN";
+				TransactionType transactionType = (TransactionType) command.Type;
+				switch (transactionType)
+				{
+					case TransactionType.CommitOnePhase:
+						command.ResponseRequired = true;
+						type = "COMMIT";
+						break;
+					case TransactionType.Rollback:
+						command.ResponseRequired = true;
+						type = "ABORT";
+						break;
+				}
+				Console.WriteLine(">>> For transaction type: " + transactionType + " we are
using command type: " + type);
+				
+				ss.WriteCommand(command, type);
+				
+				ss.WriteHeader("transaction", StompHelper.ToStomp(id));
+				
+				ss.Flush();
+			}
+		}
+		
+		protected virtual void WriteMessage(ActiveMQMessage command, StompFrameStream ss)
 		{
 			ss.WriteCommand(command, "SEND");
 			ss.WriteHeader("destination", StompHelper.ToStomp(command.Destination));
@@ -283,7 +345,7 @@
 			ss.WriteHeader("expires", command.Expiration);
 			ss.WriteHeader("priority", command.Priority);
 			ss.WriteHeader("type", command.Type);
-			ss.WriteHeader("transaction", command.TransactionId);
+			ss.WriteHeader("transaction", StompHelper.ToStomp(command.TransactionId));
 			ss.WriteHeader("persistent", command.Persistent);
 			
 			// lets force the content to be marshalled
@@ -308,7 +370,7 @@
 			ss.Flush();
 		}
 		
-		protected void WriteMessageAck(MessageAck command, StompFrameStream ss)
+		protected virtual void WriteMessageAck(MessageAck command, StompFrameStream ss)
 		{
 			ss.WriteCommand(command, "ACK");
 			
@@ -319,7 +381,7 @@
 			ss.Flush();
 		}
 		
-		protected void SendCommand(Command command)
+		protected virtual void SendCommand(Command command)
 		{
 			if (transport == null)
 			{
@@ -331,7 +393,7 @@
 			}
 		}
 		
-		protected string RemoveHeader(IDictionary headers, string name)
+		protected virtual string RemoveHeader(IDictionary headers, string name)
 		{
 			object value = headers[name];
 			if (value == null)
@@ -346,7 +408,7 @@
 		}
 		
 		
-		protected string ToString(object value)
+		protected virtual string ToString(object value)
 		{
 			if (value != null)
 			{



Mime
View raw message