activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r819725 - /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Stomp/StompWireFormat.cs
Date Mon, 28 Sep 2009 20:42:56 GMT
Author: tabish
Date: Mon Sep 28 20:42:55 2009
New Revision: 819725

URL: http://svn.apache.org/viewvc?rev=819725&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQNET-181

Set the correct Stomp headers for AMQ to create a durable subscription.

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Stomp/StompWireFormat.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Stomp/StompWireFormat.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Stomp/StompWireFormat.cs?rev=819725&r1=819724&r2=819725&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Stomp/StompWireFormat.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Stomp/StompWireFormat.cs
Mon Sep 28 20:42:55 2009
@@ -29,71 +29,71 @@
     /// </summary>
     public class StompWireFormat : IWireFormat
     {
-		private Encoding encoding = new UTF8Encoding();
-		private ITransport transport;
+        private Encoding encoding = new UTF8Encoding();
+        private ITransport transport;
         private IDictionary consumers = Hashtable.Synchronized(new Hashtable());
 
-		public StompWireFormat()
-		{
-		}
-		
-		public ITransport Transport {
-			get { return transport; }
-			set { transport = value; }
-		}
-		
+        public StompWireFormat()
+        {
+        }
+
+        public ITransport Transport {
+            get { return transport; }
+            set { transport = value; }
+        }
+
         public int Version {
             get { return 1; }
         }
 
         public void Marshal(Object o, BinaryWriter binaryWriter)
         {
-			Tracer.Debug(">>>> " + o);
-			StompFrameStream ds = new StompFrameStream(binaryWriter, encoding);
-			
-			if (o is ConnectionInfo)
-			{
-				WriteConnectionInfo((ConnectionInfo) o, ds);
-			}
-			else if (o is ActiveMQMessage)
-			{
-				WriteMessage((ActiveMQMessage) o, ds);
-			}
-			else if (o is ConsumerInfo)
-			{
-				WriteConsumerInfo((ConsumerInfo) o, ds);
-			}
-			else if (o is MessageAck)
-			{
-				WriteMessageAck((MessageAck) o, ds);
-			}
-			else if (o is TransactionInfo)
-			{
-				WriteTransactionInfo((TransactionInfo) o, ds);
-			}
-			else if (o is ShutdownInfo)
-			{
-				WriteShutdownInfo((ShutdownInfo) o, ds);
-			}
-			else if (o is RemoveInfo)
-			{
-				WriteRemoveInfo((RemoveInfo) o, ds);
-			}
-			else if (o is Command)
-			{
-				Command command = o as Command;
-				if (command.ResponseRequired)
-				{
-					Response response = new Response();
-					response.CorrelationId = command.CommandId;
-					SendCommand(response);
-					Tracer.Debug("#### Autorespond to command: " + o.GetType());
-				}
-			}
-			else
-			{
-				Tracer.Debug("#### Ignored command: " + o.GetType());
-			}
+            Tracer.Debug(">>>> " + o);
+            StompFrameStream ds = new StompFrameStream(binaryWriter, encoding);
+
+            if (o is ConnectionInfo)
+            {
+                WriteConnectionInfo((ConnectionInfo) o, ds);
+            }
+            else if (o is ActiveMQMessage)
+            {
+                WriteMessage((ActiveMQMessage) o, ds);
+            }
+            else if (o is ConsumerInfo)
+            {
+                WriteConsumerInfo((ConsumerInfo) o, ds);
+            }
+            else if (o is MessageAck)
+            {
+                WriteMessageAck((MessageAck) o, ds);
+            }
+            else if (o is TransactionInfo)
+            {
+                WriteTransactionInfo((TransactionInfo) o, ds);
+            }
+            else if (o is ShutdownInfo)
+            {
+                WriteShutdownInfo((ShutdownInfo) o, ds);
+            }
+            else if (o is RemoveInfo)
+            {
+                WriteRemoveInfo((RemoveInfo) o, ds);
+            }
+            else if (o is Command)
+            {
+                Command command = o as Command;
+                if (command.ResponseRequired)
+                {
+                    Response response = new Response();
+                    response.CorrelationId = command.CommandId;
+                    SendCommand(response);
+                    Tracer.Debug("#### Autorespond to command: " + o.GetType());
+                }
+            }
+            else
+            {
+                Tracer.Debug("#### Ignored command: " + o.GetType());
+            }
         }
 
 
@@ -116,86 +116,86 @@
             byte[] data = ms.ToArray();
             return encoding.GetString(data, 0, data.Length);
         }
-        
+
         public Object Unmarshal(BinaryReader dis)
         {
-			string command;
-			do {
+            string command;
+            do {
                 command = ReadLine(dis);
-			}
-			while (command == "");
-			
-			Tracer.Debug("<<<< command: " + command);
-			
-			IDictionary headers = new Hashtable();
-			string line;
+            }
+            while (command == "");
+
+            Tracer.Debug("<<<< command: " + command);
+
+            IDictionary headers = new Hashtable();
+            string line;
             while ((line = ReadLine(dis)) != "")
-			{
-				int idx = line.IndexOf(':');
-				if (idx > 0)
-				{
-					string key = line.Substring(0, idx);
-					string value = line.Substring(idx + 1);
-					headers[key] = value;
-					
-					Tracer.Debug("<<<< header: " + key + " = " + value);
-				}
-				else
-				{
-					// lets ignore this bad header!
-				}
-			}
-			byte[] content = null;
-			string length = ToString(headers["content-length"]);
-			if (length != null)
-			{
-				int size = Int32.Parse(length);
-				content = dis.ReadBytes(size);
-				// Read the terminating NULL byte for this frame.
-				int nullByte = dis.Read();
-				if(nullByte != 0)
-				{
-					Tracer.Debug("<<<< error reading frame null byte.");
-				}
-			}
-			else
-			{
+            {
+                int idx = line.IndexOf(':');
+                if (idx > 0)
+                {
+                    string key = line.Substring(0, idx);
+                    string value = line.Substring(idx + 1);
+                    headers[key] = value;
+
+                    Tracer.Debug("<<<< header: " + key + " = " + value);
+                }
+                else
+                {
+                    // lets ignore this bad header!
+                }
+            }
+            byte[] content = null;
+            string length = ToString(headers["content-length"]);
+            if (length != null)
+            {
+                int size = Int32.Parse(length);
+                content = dis.ReadBytes(size);
+                // Read the terminating NULL byte for this frame.
+                int nullByte = dis.Read();
+                if(nullByte != 0)
+                {
+                    Tracer.Debug("<<<< error reading frame null byte.");
+                }
+            }
+            else
+            {
                 MemoryStream ms = new MemoryStream();
-				int nextChar;
-				while((nextChar = dis.Read()) != 0)
-				{
-				    if( nextChar < 0 )
-				    {
-				        // EOF ??
-				        break;
-				    }
-					ms.WriteByte((byte)nextChar);
-				}
+                int nextChar;
+                while((nextChar = dis.Read()) != 0)
+                {
+                    if( nextChar < 0 )
+                    {
+                        // EOF ??
+                        break;
+                    }
+                    ms.WriteByte((byte)nextChar);
+                }
                 content = ms.ToArray();
-			}
-			Object answer = CreateCommand(command, headers, content);
-			Tracer.Debug("<<<< received: " + answer);
-			return answer;
-        }
-
-		protected virtual Object CreateCommand(string command, IDictionary headers, byte[] content)
-		{
-			if(command == "RECEIPT" || command == "CONNECTED")
-			{
-				string text = RemoveHeader(headers, "receipt-id");
-				if(text != null)
-				{
-    				Response answer = new Response();
-					if(text.StartsWith("ignore:"))
-					{
-						text = text.Substring("ignore:".Length);
-					}
-
-					answer.CorrelationId = Int32.Parse(text);
-				    return answer;
-				}
-				else if(command == "CONNECTED")
-				{
+            }
+            Object answer = CreateCommand(command, headers, content);
+            Tracer.Debug("<<<< received: " + answer);
+            return answer;
+        }
+
+        protected virtual Object CreateCommand(string command, IDictionary headers, byte[]
content)
+        {
+            if(command == "RECEIPT" || command == "CONNECTED")
+            {
+                string text = RemoveHeader(headers, "receipt-id");
+                if(text != null)
+                {
+                    Response answer = new Response();
+                    if(text.StartsWith("ignore:"))
+                    {
+                        text = text.Substring("ignore:".Length);
+                    }
+
+                    answer.CorrelationId = Int32.Parse(text);
+                    return answer;
+                }
+                else if(command == "CONNECTED")
+                {
                     text = RemoveHeader(headers, "response-id");
                     if (text != null)
                     {
@@ -203,165 +203,170 @@
                         answer.CorrelationId = Int32.Parse(text);
                         return answer;
                     }
-				}
-			}
-			else if(command == "ERROR")
-			{
-				string text = RemoveHeader(headers, "receipt-id");
-
-				if(text != null && text.StartsWith("ignore:"))
-				{
-					Response answer = new Response();
-					answer.CorrelationId = Int32.Parse(text.Substring("ignore:".Length));
-					return answer;
-				}
-				else
-				{
-					ExceptionResponse answer = new ExceptionResponse();
-					if(text != null)
-					{
-						answer.CorrelationId = Int32.Parse(text);
-					}
-
-					BrokerError error = new BrokerError();
-					error.Message = RemoveHeader(headers, "message");
-					error.ExceptionClass = RemoveHeader(headers, "exceptionClass");
-					// TODO is this the right header?
-					answer.Exception = error;
-					return answer;
-				}
-			}
-			else if (command == "MESSAGE")
-			{
-				return ReadMessage(command, headers, content);
-			}
-			Tracer.Error("Unknown command: " + command + " headers: " + headers);
-			return null;
-		}
-		
-		protected virtual Command ReadMessage(string command, IDictionary headers, byte[] content)
-		{
-			ActiveMQMessage message = null;
-			if (headers.Contains("content-length"))
-			{
-				message = new ActiveMQBytesMessage();
-				message.Content = content;
-			}
-			else
-			{
-				message = new ActiveMQTextMessage(encoding.GetString(content, 0, content.Length));
-			}
-
-			// TODO now lets set the various headers
-			
-			message.Type = RemoveHeader(headers, "type");
-			message.Destination = StompHelper.ToDestination(RemoveHeader(headers, "destination"));
-			message.ReplyTo = StompHelper.ToDestination(RemoveHeader(headers, "reply-to"));
-			message.TargetConsumerId = StompHelper.ToConsumerId(RemoveHeader(headers, "subscription"));
-			message.CorrelationId = RemoveHeader(headers, "correlation-id");
-			message.MessageId = StompHelper.ToMessageId(RemoveHeader(headers, "message-id"));
-			message.Persistent = StompHelper.ToBool(RemoveHeader(headers, "persistent"), true);
-			
-			string header = RemoveHeader(headers, "priority");
-			if (header != null) message.Priority = Byte.Parse(header);
-			
-			header = RemoveHeader(headers, "timestamp");
-			if (header != null) message.Timestamp = Int64.Parse(header);
-
-			header = RemoveHeader(headers, "expires");
-			if (header != null) message.Expiration = Int64.Parse(header);
-			
-			// now lets add the generic headers
-			foreach (string key in headers.Keys)
-			{
-				Object value = headers[key];
-				if (value != null)
-				{
-					// lets coerce some standard header extensions
-					if (key == "NMSXGroupSeq")
-					{
-						value = Int32.Parse(value.ToString());
-					}
-				}
-				message.Properties[key] = value;
-			}
-			MessageDispatch dispatch = new MessageDispatch();
-			dispatch.Message = message;
-			dispatch.ConsumerId = message.TargetConsumerId;
-			dispatch.Destination = message.Destination;
-			return dispatch;
-		}
-		
-		
-		
-		
-		protected virtual void WriteConnectionInfo(ConnectionInfo command, StompFrameStream ss)
-		{
-			// lets force a receipt
-			command.ResponseRequired = true;
-			
-			ss.WriteCommand(command, "CONNECT");
-			ss.WriteHeader("client-id", command.ClientId);
-			ss.WriteHeader("login", command.UserName);
-			ss.WriteHeader("passcode", command.Password);
-		    
-		    if (command.ResponseRequired)
-			{
+                }
+            }
+            else if(command == "ERROR")
+            {
+                string text = RemoveHeader(headers, "receipt-id");
+
+                if(text != null && text.StartsWith("ignore:"))
+                {
+                    Response answer = new Response();
+                    answer.CorrelationId = Int32.Parse(text.Substring("ignore:".Length));
+                    return answer;
+                }
+                else
+                {
+                    ExceptionResponse answer = new ExceptionResponse();
+                    if(text != null)
+                    {
+                        answer.CorrelationId = Int32.Parse(text);
+                    }
+
+                    BrokerError error = new BrokerError();
+                    error.Message = RemoveHeader(headers, "message");
+                    error.ExceptionClass = RemoveHeader(headers, "exceptionClass");
+                    // TODO is this the right header?
+                    answer.Exception = error;
+                    return answer;
+                }
+            }
+            else if (command == "MESSAGE")
+            {
+                return ReadMessage(command, headers, content);
+            }
+            Tracer.Error("Unknown command: " + command + " headers: " + headers);
+            return null;
+        }
+
+        protected virtual Command ReadMessage(string command, IDictionary headers, byte[]
content)
+        {
+            ActiveMQMessage message = null;
+            if (headers.Contains("content-length"))
+            {
+                message = new ActiveMQBytesMessage();
+                message.Content = content;
+            }
+            else
+            {
+                message = new ActiveMQTextMessage(encoding.GetString(content, 0, content.Length));
+            }
+
+            // TODO now lets set the various headers
+
+            message.Type = RemoveHeader(headers, "type");
+            message.Destination = StompHelper.ToDestination(RemoveHeader(headers, "destination"));
+            message.ReplyTo = StompHelper.ToDestination(RemoveHeader(headers, "reply-to"));
+            message.TargetConsumerId = StompHelper.ToConsumerId(RemoveHeader(headers, "subscription"));
+            message.CorrelationId = RemoveHeader(headers, "correlation-id");
+            message.MessageId = StompHelper.ToMessageId(RemoveHeader(headers, "message-id"));
+            message.Persistent = StompHelper.ToBool(RemoveHeader(headers, "persistent"),
true);
+
+            string header = RemoveHeader(headers, "priority");
+            if (header != null) message.Priority = Byte.Parse(header);
+
+            header = RemoveHeader(headers, "timestamp");
+            if (header != null) message.Timestamp = Int64.Parse(header);
+
+            header = RemoveHeader(headers, "expires");
+            if (header != null) message.Expiration = Int64.Parse(header);
+
+            // now lets add the generic headers
+            foreach (string key in headers.Keys)
+            {
+                Object value = headers[key];
+                if (value != null)
+                {
+                    // lets coerce some standard header extensions
+                    if (key == "NMSXGroupSeq")
+                    {
+                        value = Int32.Parse(value.ToString());
+                    }
+                }
+                message.Properties[key] = value;
+            }
+            MessageDispatch dispatch = new MessageDispatch();
+            dispatch.Message = message;
+            dispatch.ConsumerId = message.TargetConsumerId;
+            dispatch.Destination = message.Destination;
+            return dispatch;
+        }
+
+        protected virtual void WriteConnectionInfo(ConnectionInfo command, StompFrameStream
ss)
+        {
+            // lets force a receipt
+            command.ResponseRequired = true;
+
+            ss.WriteCommand(command, "CONNECT");
+            ss.WriteHeader("client-id", command.ClientId);
+            ss.WriteHeader("login", command.UserName);
+            ss.WriteHeader("passcode", command.Password);
+
+            if (command.ResponseRequired)
+            {
                 ss.WriteHeader("request-id", command.CommandId);
-			}
+            }
+
+            ss.Flush();
+        }
+
+        protected virtual void WriteShutdownInfo(ShutdownInfo command, StompFrameStream ss)
+        {
+            ss.WriteCommand(command, "DISCONNECT");
+            System.Diagnostics.Debug.Assert(!command.ResponseRequired);
+            ss.Flush();
+        }
 
-			ss.Flush();
-		}
-		
-		protected virtual void WriteShutdownInfo(ShutdownInfo command, StompFrameStream ss)
-		{
-			ss.WriteCommand(command, "DISCONNECT");
-			System.Diagnostics.Debug.Assert(!command.ResponseRequired);
-			ss.Flush();
-		}
-
-		protected virtual void WriteConsumerInfo(ConsumerInfo command, StompFrameStream ss)
-		{
-			ss.WriteCommand(command, "SUBSCRIBE");
-			ss.WriteHeader("destination", StompHelper.ToStomp(command.Destination));
-			ss.WriteHeader("id", StompHelper.ToStomp(command.ConsumerId));
-		    ss.WriteHeader("durable-subscriber-name", command.SubscriptionName);
+        protected virtual void WriteConsumerInfo(ConsumerInfo command, StompFrameStream ss)
+        {
+            ss.WriteCommand(command, "SUBSCRIBE");
+            ss.WriteHeader("destination", StompHelper.ToStomp(command.Destination));
+            ss.WriteHeader("id", StompHelper.ToStomp(command.ConsumerId));
+            ss.WriteHeader("durable-subscriber-name", command.SubscriptionName);
             ss.WriteHeader("selector", command.Selector);
             if ( command.NoLocal )
                 ss.WriteHeader("no-local", command.NoLocal);
-			ss.WriteHeader("ack", "client");
+            ss.WriteHeader("ack", "client");
 
-			// ActiveMQ extensions to STOMP
-			ss.WriteHeader("activemq.dispatchAsync", command.DispatchAsync);
+            // ActiveMQ extensions to STOMP
+            ss.WriteHeader("activemq.dispatchAsync", command.DispatchAsync);
             if ( command.Exclusive )
-			    ss.WriteHeader("activemq.exclusive", command.Exclusive);
-		    
-			ss.WriteHeader("activemq.maximumPendingMessageLimit", command.MaximumPendingMessageLimit);
-			ss.WriteHeader("activemq.prefetchSize", command.PrefetchSize);
-			ss.WriteHeader("activemq.priority", command.Priority);
+                ss.WriteHeader("activemq.exclusive", command.Exclusive);
+
+            if( command.SubscriptionName != null )
+            {
+                ss.WriteHeader("activemq.subscriptionName", command.SubscriptionName);
+                // For an older 4.0 broker we need to set this header so they get the
+                // subscription as wel..
+                ss.WriteHeader("activemq.subcriptionName", command.SubscriptionName);
+            }
+            
+            ss.WriteHeader("activemq.maximumPendingMessageLimit", command.MaximumPendingMessageLimit);
+            ss.WriteHeader("activemq.prefetchSize", command.PrefetchSize);
+            ss.WriteHeader("activemq.priority", command.Priority);
             if ( command.Retroactive )
-			    ss.WriteHeader("activemq.retroactive", command.Retroactive);
+                ss.WriteHeader("activemq.retroactive", command.Retroactive);
 
             consumers[command.ConsumerId] = command.ConsumerId;
-			ss.Flush();
-		}
+            ss.Flush();
+        }
 
-		protected virtual void WriteRemoveInfo(RemoveInfo command, StompFrameStream ss)
-		{
-			object id = command.ObjectId;
-
-			if (id is ConsumerId)
-			{
-				ConsumerId consumerId = id as ConsumerId;
-				ss.WriteCommand(command, "UNSUBSCRIBE");
-				ss.WriteHeader("id", StompHelper.ToStomp(consumerId));
-				ss.Flush();
+        protected virtual void WriteRemoveInfo(RemoveInfo command, StompFrameStream ss)
+        {
+            object id = command.ObjectId;
+
+            if (id is ConsumerId)
+            {
+                ConsumerId consumerId = id as ConsumerId;
+                ss.WriteCommand(command, "UNSUBSCRIBE");
+                ss.WriteHeader("id", StompHelper.ToStomp(consumerId));
+                ss.Flush();
                 consumers.Remove(consumerId);
             }
             else if (id is SessionId)
             {
-				// When a session is removed, it needs to remove it's consumers too.
-				// Find all the consumer that were part of the session.
+                // When a session is removed, it needs to remove it's consumers too.
+                // Find all the consumer that were part of the session.
                 SessionId sessionId = (SessionId) id;
                 ArrayList matches = new ArrayList();
                 foreach (DictionaryEntry entry in consumers)
@@ -373,7 +378,7 @@
                     }
                 }
 
-            	bool unsubscribedConsumer = false;
+                bool unsubscribedConsumer = false;
 
                 // Un-subscribe them.
                 foreach (ConsumerId consumerId in matches)
@@ -382,162 +387,162 @@
                     ss.WriteHeader("id", StompHelper.ToStomp(consumerId));
                     ss.Flush();
                     consumers.Remove(consumerId);
-                	unsubscribedConsumer = true;
+                    unsubscribedConsumer = true;
                 }
 
-				if(!unsubscribedConsumer && command.ResponseRequired)
-				{
-					ss.WriteCommand(command, "UNSUBSCRIBE", true);
-					ss.WriteHeader("id", sessionId);
-					ss.Flush();
-				}
-            }
-			else if(id is ProducerId)
-			{
-				if(command.ResponseRequired)
-				{
-					ss.WriteCommand(command, "UNSUBSCRIBE", true);
-					ss.WriteHeader("id", id);
-					ss.Flush();
-				}
-			}
-			else if(id is ConnectionId)
-			{
-				if(command.ResponseRequired)
-				{
-					ss.WriteCommand(command, "UNSUBSCRIBE", true);
-					ss.WriteHeader("id", id);
-					ss.Flush();
-				}
-			}
-		}
-		
-		
-		protected virtual void WriteTransactionInfo(TransactionInfo command, StompFrameStream ss)
-		{
-			TransactionId id = command.TransactionId;
-			if (id is LocalTransactionId)
-			{
-				string type = "BEGIN";
-				TransactionType transactionType = (TransactionType) command.Type;
-				switch (transactionType)
-				{
-					case TransactionType.CommitOnePhase:
-						command.ResponseRequired = true;
-						type = "COMMIT";
-						break;
-					case TransactionType.Rollback:
-						command.ResponseRequired = true;
-						type = "ABORT";
-						break;
-				}
-
-				Tracer.Debug(">>> For transaction type: " + transactionType + " we are using
command type: " + type);
-				ss.WriteCommand(command, type);
-				ss.WriteHeader("transaction", StompHelper.ToStomp(id));
-				ss.Flush();
-			}
-		}
-		
-		protected virtual void WriteMessage(ActiveMQMessage command, StompFrameStream ss)
-		{
-			ss.WriteCommand(command, "SEND");
-			ss.WriteHeader("destination", StompHelper.ToStomp(command.Destination));
+                if(!unsubscribedConsumer && command.ResponseRequired)
+                {
+                    ss.WriteCommand(command, "UNSUBSCRIBE", true);
+                    ss.WriteHeader("id", sessionId);
+                    ss.Flush();
+                }
+            }
+            else if(id is ProducerId)
+            {
+                if(command.ResponseRequired)
+                {
+                    ss.WriteCommand(command, "UNSUBSCRIBE", true);
+                    ss.WriteHeader("id", id);
+                    ss.Flush();
+                }
+            }
+            else if(id is ConnectionId)
+            {
+                if(command.ResponseRequired)
+                {
+                    ss.WriteCommand(command, "UNSUBSCRIBE", true);
+                    ss.WriteHeader("id", id);
+                    ss.Flush();
+                }
+            }
+        }
+
+
+        protected virtual void WriteTransactionInfo(TransactionInfo command, StompFrameStream
ss)
+        {
+            TransactionId id = command.TransactionId;
+            if (id is LocalTransactionId)
+            {
+                string type = "BEGIN";
+                TransactionType transactionType = (TransactionType) command.Type;
+                switch (transactionType)
+                {
+                    case TransactionType.CommitOnePhase:
+                        command.ResponseRequired = true;
+                        type = "COMMIT";
+                        break;
+                    case TransactionType.Rollback:
+                        command.ResponseRequired = true;
+                        type = "ABORT";
+                        break;
+                }
+
+                Tracer.Debug(">>> For transaction type: " + transactionType + "
we are using command type: " + type);
+                ss.WriteCommand(command, type);
+                ss.WriteHeader("transaction", StompHelper.ToStomp(id));
+                ss.Flush();
+            }
+        }
+
+        protected virtual void WriteMessage(ActiveMQMessage command, StompFrameStream ss)
+        {
+            ss.WriteCommand(command, "SEND");
+            ss.WriteHeader("destination", StompHelper.ToStomp(command.Destination));
             if (command.ReplyTo != null)
-			    ss.WriteHeader("reply-to", StompHelper.ToStomp(command.ReplyTo));
+                ss.WriteHeader("reply-to", StompHelper.ToStomp(command.ReplyTo));
             if (command.CorrelationId != null )
                 ss.WriteHeader("correlation-id", command.CorrelationId);
             if (command.Expiration != 0)
                 ss.WriteHeader("expires", command.Expiration);
             if (command.Priority != 4)
-			    ss.WriteHeader("priority", command.Priority);
+                ss.WriteHeader("priority", command.Priority);
             if (command.Type != null)
-                ss.WriteHeader("type", command.Type);            
-		    if (command.TransactionId!=null)
-			    ss.WriteHeader("transaction", StompHelper.ToStomp(command.TransactionId));
-		    
-			ss.WriteHeader("persistent", command.Persistent);
-			
-			// lets force the content to be marshalled
-			
-			command.BeforeMarshall(null);
-			if (command is ActiveMQTextMessage)
-			{
-				ActiveMQTextMessage textMessage = command as ActiveMQTextMessage;
-				ss.Content = encoding.GetBytes(textMessage.Text);
-			}
-			else
-			{
-				ss.Content = command.Content;
-				if(null != command.Content)
-				{
-					ss.ContentLength = command.Content.Length;
-				}
-				else
-				{
-					ss.ContentLength = 0;
-				}
-			}
-	
-			IPrimitiveMap map = command.Properties;
-			foreach (string key in map.Keys)
-			{
-				ss.WriteHeader(key, map[key]);
-			}
-			ss.Flush();
-		}
-		
-		protected virtual void WriteMessageAck(MessageAck command, StompFrameStream ss)
-		{
-			ss.WriteCommand(command, "ACK", true);
-			
-			// TODO handle bulk ACKs?
-			ss.WriteHeader("message-id", StompHelper.ToStomp(command.LastMessageId));
-			if(command.TransactionId != null)
-			{
+                ss.WriteHeader("type", command.Type);
+            if (command.TransactionId!=null)
                 ss.WriteHeader("transaction", StompHelper.ToStomp(command.TransactionId));
-			}
 
-			ss.Flush();
-		}
-		
-		protected virtual void SendCommand(Command command)
-		{
-			if (transport == null)
-			{
-				Tracer.Fatal("No transport configured so cannot return command: " + command);
-			}
-			else
-			{
-				transport.Command(transport, command);
-			}
-		}
-		
-		protected virtual string RemoveHeader(IDictionary headers, string name)
-		{
-			object value = headers[name];
-			if (value == null)
-			{
-				return null;
-			}
-			else
-			{
-				headers.Remove(name);
-				return value.ToString();
-			}
-		}
-		
-		
-		protected virtual string ToString(object value)
-		{
-			if (value != null)
-			{
-				return value.ToString();
-			}
-			else
-			{
-				return null;
-			}
-		}
+            ss.WriteHeader("persistent", command.Persistent);
+
+            // lets force the content to be marshalled
+
+            command.BeforeMarshall(null);
+            if (command is ActiveMQTextMessage)
+            {
+                ActiveMQTextMessage textMessage = command as ActiveMQTextMessage;
+                ss.Content = encoding.GetBytes(textMessage.Text);
+            }
+            else
+            {
+                ss.Content = command.Content;
+                if(null != command.Content)
+                {
+                    ss.ContentLength = command.Content.Length;
+                }
+                else
+                {
+                    ss.ContentLength = 0;
+                }
+            }
+
+            IPrimitiveMap map = command.Properties;
+            foreach (string key in map.Keys)
+            {
+                ss.WriteHeader(key, map[key]);
+            }
+            ss.Flush();
+        }
+
+        protected virtual void WriteMessageAck(MessageAck command, StompFrameStream ss)
+        {
+            ss.WriteCommand(command, "ACK", true);
+
+            // TODO handle bulk ACKs?
+            ss.WriteHeader("message-id", StompHelper.ToStomp(command.LastMessageId));
+            if(command.TransactionId != null)
+            {
+                ss.WriteHeader("transaction", StompHelper.ToStomp(command.TransactionId));
+            }
+
+            ss.Flush();
+        }
+
+        protected virtual void SendCommand(Command command)
+        {
+            if (transport == null)
+            {
+                Tracer.Fatal("No transport configured so cannot return command: " + command);
+            }
+            else
+            {
+                transport.Command(transport, command);
+            }
+        }
+
+        protected virtual string RemoveHeader(IDictionary headers, string name)
+        {
+            object value = headers[name];
+            if (value == null)
+            {
+                return null;
+            }
+            else
+            {
+                headers.Remove(name);
+                return value.ToString();
+            }
+        }
+
+
+        protected virtual string ToString(object value)
+        {
+            if (value != null)
+            {
+                return value.ToString();
+            }
+            else
+            {
+                return null;
+            }
+        }
     }
 }



Mime
View raw message