activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r589629 [4/7] - in /activemq/activemq-dotnet/trunk: ./ src/main/csharp/ActiveMQ/ src/main/csharp/ActiveMQ/Commands/ src/main/csharp/ActiveMQ/OpenWire/ src/main/csharp/ActiveMQ/OpenWire/V1/ src/main/csharp/ActiveMQ/OpenWire/V2/ src/main/csha...
Date Mon, 29 Oct 2007 13:55:57 GMT
Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Session.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Session.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Session.cs Mon Oct 29 06:55:09 2007
@@ -14,475 +14,521 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using ActiveMQ.Commands;
-using NMS;
 using System;
 using System.Collections;
+using Apache.ActiveMQ.Commands;
+using Apache.NMS;
 
-namespace ActiveMQ {
-        /// <summary>
-        /// Default provider of ISession
-        /// </summary>
-        public class Session : ISession
-        {
-                private Connection connection;
-                private SessionInfo info;
-                private AcknowledgementMode acknowledgementMode;
-                private long consumerCounter;
-                private long producerCounter;
-                private int prefetchSize = 1000;
-                private int maximumPendingMessageLimit;
-                private byte priority;
-                private bool dispatchAsync;
-                private bool exclusive;
-                private bool retroactive;
-				private bool asyncSend;
-                private IDictionary consumers = Hashtable.Synchronized(new Hashtable());
-                private TransactionContext transactionContext;
-                private DispatchingThread dispatchingThread;
-                
-                public Session(Connection connection, SessionInfo info, AcknowledgementMode acknowledgementMode)
-                {
-                        this.connection = connection;
-                        this.info = info;
-                        this.acknowledgementMode = acknowledgementMode;
-						this.asyncSend = connection.AsyncSend;
-                        transactionContext = new TransactionContext(this);
-                        dispatchingThread = new DispatchingThread(new DispatchingThread.DispatchFunction(DispatchAsyncMessages));
-                        dispatchingThread.ExceptionListener += new DispatchingThread.ExceptionHandler(dispatchingThread_ExceptionListener);
-                }
-
-                void dispatchingThread_ExceptionListener(Exception exception)
-                {
-                        connection.OnSessionException(this, exception);
-                }
-
-
-                /// <summary>
-                /// Sets the prefetch size, the maximum number of messages a broker will dispatch to consumers
-                /// until acknowledgements are received.
-                /// </summary>
-                public int PrefetchSize {
-                        get { return prefetchSize; }
-                        set { this.prefetchSize = value; }
-                }
-
-                /// <summary>
-                /// Sets the maximum number of messages to keep around per consumer
-                /// in addition to the prefetch window for non-durable topics until messages
-                /// will start to be evicted for slow consumers.
-                /// Must be > 0 to enable this feature
-                /// </summary>
-                public int MaximumPendingMessageLimit {
-                        get { return maximumPendingMessageLimit; }
-                        set { this.maximumPendingMessageLimit = value; }
-                }
-
-                /// <summary>
-                /// Enables or disables whether asynchronous dispatch should be used by the broker
-                /// </summary>
-                public bool DispatchAsync {
-                        get { return dispatchAsync; }
-                        set { this.dispatchAsync = value; }
-                }
-
-                /// <summary>
-                /// Enables or disables exclusive consumers when using queues. An exclusive consumer means
-                /// only one instance of a consumer is allowed to process messages on a queue to preserve order
-                /// </summary>
-                public bool Exclusive {
-                        get { return exclusive; }
-                        set { this.exclusive = value; }
-                }
-
-                /// <summary>
-                /// Enables or disables retroactive mode for consumers; i.e. do they go back in time or not?
-                /// </summary>
-                public bool Retroactive {
-                        get { return retroactive; }
-                        set { this.retroactive = value; }
-                }
-
-                /// <summary>
-                /// Sets the default consumer priority for consumers
-                /// </summary>
-                public byte Priority {
-                        get { return priority; }
-                        set { this.priority = value; }
-                }
-		
-				/// <summary>
-				/// This property indicates whether or not async send is enabled.
-				/// </summary>
-				public bool AsyncSend
-				{
-					get { return asyncSend; }
-					set { asyncSend = value; }
-				}
-		
-                public void Dispose()
-                {
-                        connection.DisposeOf(info.SessionId);
-                }
-
-                public IMessageProducer CreateProducer()
-                {
-                        return CreateProducer(null);
-                }
-
-                public IMessageProducer CreateProducer(IDestination destination)
-                {
-                        ProducerInfo command = CreateProducerInfo(destination);
-                        connection.SyncRequest(command);
-                        return new MessageProducer(this, command);
-                }
-
-
-
-                public IMessageConsumer CreateConsumer(IDestination destination)
-                {
-                        return CreateConsumer(destination, null);
-                }
-
-                public IMessageConsumer CreateConsumer(IDestination destination, string selector)
-                {
-	                return CreateConsumer(destination, null, false);
-				}
-				
-                public IMessageConsumer CreateConsumer(IDestination destination, string selector, bool noLocal)
-                {
-                        ConsumerInfo command = CreateConsumerInfo(destination, selector);
-                        command.NoLocal = noLocal;
-
-                        ConsumerId consumerId = command.ConsumerId;
-
-                        try
-                        {
-                                MessageConsumer consumer = new MessageConsumer(this, command, acknowledgementMode);
-                                // lets register the consumer first in case we start dispatching messages immediately
-                                connection.AddConsumer(consumerId, consumer);
-
-                                connection.SyncRequest(command);
-
-                                consumers[consumerId] = consumer;
-                                return consumer;
-                        }
-                        catch (Exception e)
-                        {
-                                connection.RemoveConsumer(consumerId);
-                                throw e;
-                        }
-                }
-
-                public IMessageConsumer CreateDurableConsumer(
-                        ITopic destination,
-                        string name,
-                        string selector,
-                        bool noLocal)
-                {
-                        ConsumerInfo command = CreateConsumerInfo(destination, selector);
-                        ConsumerId consumerId = command.ConsumerId;
-                        command.SubscriptionName = name;
-                        command.NoLocal = noLocal;
-
-                        try
-                        {
-                                MessageConsumer consumer = new MessageConsumer(this, command, acknowledgementMode);
-                                // lets register the consumer first in case we start dispatching messages immediately
-                                connection.AddConsumer(consumerId, consumer);
-
-                                connection.SyncRequest(command);
-
-                                consumers[consumerId] = consumer;
-                                return consumer;
-                        }
-                        catch (Exception e)
-                        {
-                                connection.RemoveConsumer(consumerId);
-                                throw e;
-                        }
-                }
-
-                public IQueue GetQueue(string name)
-                {
-                        return new ActiveMQQueue(name);
-                }
-
-                public ITopic GetTopic(string name)
-                {
-                        return new ActiveMQTopic(name);
-                }
-
-                public ITemporaryQueue CreateTemporaryQueue()
-                {
-                        ActiveMQTempQueue answer = new ActiveMQTempQueue(connection.CreateTemporaryDestinationName());
-                        CreateTemporaryDestination(answer);
-                        return answer;
-                }
-
-                public ITemporaryTopic CreateTemporaryTopic()
-                {
-                        ActiveMQTempTopic answer = new ActiveMQTempTopic(connection.CreateTemporaryDestinationName());
-                        CreateTemporaryDestination(answer);
-                        return answer;
-                }
-
-                protected void CreateTemporaryDestination(ActiveMQDestination tempDestination)
-                {
-                        DestinationInfo command = new DestinationInfo();
-                        command.ConnectionId = connection.ConnectionId;
-                        command.OperationType = 0; // 0 is add
-                        command.Destination = tempDestination;
-
-                        connection.SyncRequest(command);
-                }
-
-                protected void DestroyTemporaryDestination(ActiveMQDestination tempDestination)
-                {
-                        DestinationInfo command = new DestinationInfo();
-                        command.ConnectionId = connection.ConnectionId;
-                        command.OperationType = 1; // 1 is remove
-                        command.Destination = tempDestination;
-
-                        connection.SyncRequest(command);
-                }
-
-
-                public IMessage CreateMessage()
-                {
-                        ActiveMQMessage answer = new ActiveMQMessage();
-                        Configure(answer);
-                        return answer;
-                }
-
-
-                public ITextMessage CreateTextMessage()
-                {
-                        ActiveMQTextMessage answer = new ActiveMQTextMessage();
-                        Configure(answer);
-                        return answer;
-                }
-
-                public ITextMessage CreateTextMessage(string text)
-                {
-                        ActiveMQTextMessage answer = new ActiveMQTextMessage(text);
-                        Configure(answer);
-                        return answer;
-                }
-
-                public IMapMessage CreateMapMessage()
-                {
-                        return new ActiveMQMapMessage();
-                }
-
-                public IBytesMessage CreateBytesMessage()
-                {
-                        return new ActiveMQBytesMessage();
-                }
-
-                public IBytesMessage CreateBytesMessage(byte[] body)
-                {
-                        ActiveMQBytesMessage answer = new ActiveMQBytesMessage();
-                        answer.Content = body;
-                        return answer;
-                }
-
-                public IObjectMessage CreateObjectMessage(object body)
-                {
-                        ActiveMQObjectMessage answer = new ActiveMQObjectMessage();
-                        answer.Body = body;
-                        return answer;
-                }
-
-                public void Commit()
-                {
-                        if (!Transacted)
-                        {
-                                throw new InvalidOperationException(
-                                        "You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: "
-                                                + acknowledgementMode);
-                        }
-                        transactionContext.Commit();
-                }
-
-                public void Rollback()
-                {
-                        if (!Transacted)
-                        {
-                                throw new InvalidOperationException(
-                                        "You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: "
-                                                + acknowledgementMode);
-                        }
-                        transactionContext.Rollback();
-
-                        // lets ensure all the consumers redeliver any rolled back messages
-                        foreach (MessageConsumer consumer in GetConsumers())
-                        {
-                                consumer.RedeliverRolledBackMessages();
-                        }
-                }
-
-
-
-                // Properties
-
-                public Connection Connection {
-                        get { return connection; }
-                }
-
-                public SessionId SessionId {
-                        get { return info.SessionId; }
-                }
-
-                public AcknowledgementMode AcknowledgementMode {
-                        get { return acknowledgementMode; }
-                }
-
-                public bool Transacted {
-                        get { return acknowledgementMode == AcknowledgementMode.Transactional; }
-                }
-
-                public TransactionContext TransactionContext {
-                        get { return transactionContext; }
-                }
-
-                // Implementation methods
-				public void DoSend(ActiveMQMessage message)
-                {
-					if (AsyncSend)
-					{
-						connection.OneWay(message);
-					}
-					else
-					{
-						connection.SyncRequest(message);
-					}
-                }
-
-                public void Close()
-                {
-                        // To do: what about session id?
-                        StopAsyncDelivery();
-                }
-                
-                /// <summary>
-                /// Ensures that a transaction is started
-                /// </summary>
-                public void DoStartTransaction()
-                {
-                        if (Transacted)
-                        {
-                                transactionContext.Begin();
-                        }
-                }
-
-                public void DisposeOf(ConsumerId objectId)
-                {
-                        consumers.Remove(objectId);
-                        connection.RemoveConsumer(objectId);
-                        connection.DisposeOf(objectId);
-                }
-
-                /// <summary>
-                /// Private method called by the dispatcher thread in order to perform
-                /// asynchronous delivery of queued (inbound) messages.
-                /// </summary>
-                private void DispatchAsyncMessages()
-                {
-                        // lets iterate through each consumer created by this session
-                        // ensuring that they have all pending messages dispatched
-                        foreach (MessageConsumer consumer in GetConsumers())
-                        {
-                                consumer.DispatchAsyncMessages();
-                        }
-                }
-
-
-                /// <summary>
-                /// Returns a copy of the current consumers in a thread safe way to avoid concurrency
-                /// problems if the consumers are changed in another thread
-                /// </summary>
-                protected ICollection GetConsumers()
-                {
-                        lock (consumers.SyncRoot)
-                        {
-                                return new ArrayList(consumers.Values);
-                        }
-                }
-
-                protected virtual ConsumerInfo CreateConsumerInfo(IDestination destination, string selector)
-                {
-                        ConsumerInfo answer = new ConsumerInfo();
-                        ConsumerId id = new ConsumerId();
-                        id.ConnectionId = info.SessionId.ConnectionId;
-                        id.SessionId = info.SessionId.Value;
-                        lock (this)
-                        {
-                                id.Value = ++consumerCounter;
-                        }
-                        answer.ConsumerId = id;
-                        answer.Destination = ActiveMQDestination.Transform(destination);
-                        answer.Selector = selector;
-                        answer.PrefetchSize = prefetchSize;
-                        answer.Priority = priority;
-                        answer.Exclusive = exclusive;
-                        answer.DispatchAsync = dispatchAsync;
-                        answer.Retroactive = retroactive;
-
-                        // If the destination contained a URI query, then use it to set public properties
-                        // on the ConsumerInfo
-                        ActiveMQDestination amqDestination = destination as ActiveMQDestination;
-                        if (amqDestination != null && amqDestination.Options != null)
-                        {
-                                Util.URISupport.SetProperties(answer, amqDestination.Options, "consumer.");
-                        }
-
-                        return answer;
-                }
-
-                protected virtual ProducerInfo CreateProducerInfo(IDestination destination)
-                {
-                        ProducerInfo answer = new ProducerInfo();
-                        ProducerId id = new ProducerId();
-                        id.ConnectionId = info.SessionId.ConnectionId;
-                        id.SessionId = info.SessionId.Value;
-                        lock (this)
-                        {
-                                id.Value = ++producerCounter;
-                        }
-                        answer.ProducerId = id;
-                        answer.Destination = ActiveMQDestination.Transform(destination);
-
-                        answer.Destination = ActiveMQDestination.Transform(destination);
-
-                        // If the destination contained a URI query, then use it to set public
-                        // properties on the ProducerInfo
-                        ActiveMQDestination amqDestination = destination as ActiveMQDestination;
-                        if (amqDestination != null && amqDestination.Options != null)
-                        {
-                                Util.URISupport.SetProperties(answer, amqDestination.Options, "producer.");
-                        }
-
-                        return answer;
-                }
-
-                /// <summary>
-                /// Configures the message command
-                /// </summary>
-                protected void Configure(ActiveMQMessage message)
-                {
-                }
-
-				internal void StopAsyncDelivery()
-				{
-					dispatchingThread.Stop();
-				}
-
-				internal void StartAsyncDelivery(Dispatcher dispatcher)
-				{
-					if(dispatcher != null)
-						dispatcher.SetAsyncDelivery(dispatchingThread.EventHandle);
-					dispatchingThread.Start();
-				}
-        }
-}
+namespace Apache.ActiveMQ
+{
+	/// <summary>
+	/// Default provider of ISession
+	/// </summary>
+	public class Session : ISession
+	{
+		private AcknowledgementMode acknowledgementMode;
+		private bool asyncSend;
+		private Connection connection;
+		private long consumerCounter;
+		private IDictionary consumers = Hashtable.Synchronized(new Hashtable());
+		private bool dispatchAsync;
+		private DispatchingThread dispatchingThread;
+		private bool exclusive;
+		private SessionInfo info;
+		private int maximumPendingMessageLimit;
+		private int prefetchSize = 1000;
+		private byte priority;
+		private long producerCounter;
+		private bool retroactive;
+		private TransactionContext transactionContext;
+		private bool disposed = false;
+
+		public Session(Connection connection, SessionInfo info, AcknowledgementMode acknowledgementMode)
+		{
+			this.connection = connection;
+			this.info = info;
+			this.acknowledgementMode = acknowledgementMode;
+			this.asyncSend = connection.AsyncSend;
+			transactionContext = new TransactionContext(this);
+			dispatchingThread =
+					new DispatchingThread(new DispatchingThread.DispatchFunction(DispatchAsyncMessages));
+			dispatchingThread.ExceptionListener +=
+					new DispatchingThread.ExceptionHandler(dispatchingThread_ExceptionListener);
+		}
+
+		~Session()
+		{
+			Dispose(false);
+		}
+
+		/// <summary>
+		/// Sets the prefetch size, the maximum number of messages a broker will dispatch to consumers
+		/// until acknowledgements are received.
+		/// </summary>
+		public int PrefetchSize
+		{
+			get { return prefetchSize; }
+			set { this.prefetchSize = value; }
+		}
+
+		/// <summary>
+		/// Sets the maximum number of messages to keep around per consumer
+		/// in addition to the prefetch window for non-durable topics until messages
+		/// will start to be evicted for slow consumers.
+		/// Must be > 0 to enable this feature
+		/// </summary>
+		public int MaximumPendingMessageLimit
+		{
+			get { return maximumPendingMessageLimit; }
+			set { this.maximumPendingMessageLimit = value; }
+		}
+
+		/// <summary>
+		/// Enables or disables whether asynchronous dispatch should be used by the broker
+		/// </summary>
+		public bool DispatchAsync
+		{
+			get { return dispatchAsync; }
+			set { this.dispatchAsync = value; }
+		}
+
+		/// <summary>
+		/// Enables or disables exclusive consumers when using queues. An exclusive consumer means
+		/// only one instance of a consumer is allowed to process messages on a queue to preserve order
+		/// </summary>
+		public bool Exclusive
+		{
+			get { return exclusive; }
+			set { this.exclusive = value; }
+		}
+
+		/// <summary>
+		/// Enables or disables retroactive mode for consumers; i.e. do they go back in time or not?
+		/// </summary>
+		public bool Retroactive
+		{
+			get { return retroactive; }
+			set { this.retroactive = value; }
+		}
+
+		/// <summary>
+		/// Sets the default consumer priority for consumers
+		/// </summary>
+		public byte Priority
+		{
+			get { return priority; }
+			set { this.priority = value; }
+		}
+
+		/// <summary>
+		/// This property indicates whether or not async send is enabled.
+		/// </summary>
+		public bool AsyncSend
+		{
+			get { return asyncSend; }
+			set { asyncSend = value; }
+		}
+
+		public Connection Connection
+		{
+			get { return connection; }
+		}
+
+		public SessionId SessionId
+		{
+			get { return info.SessionId; }
+		}
+
+		public TransactionContext TransactionContext
+		{
+			get { return transactionContext; }
+		}
+
+		#region ISession Members
+
+		public void Dispose()
+		{
+			Dispose(true);
+			GC.SuppressFinalize(this);
+		}
+
+		protected void Dispose(bool disposing)
+		{
+			if(disposed)
+			{
+				return;
+			}
+
+			if(disposing)
+			{
+				// Dispose managed code here.
+			}
+
+			try
+			{
+				connection.DisposeOf(info.SessionId);
+			}
+			catch
+			{
+				// Ignore network errors.
+			}
+
+			disposed = true;
+		}
+
+		public IMessageProducer CreateProducer()
+		{
+			return CreateProducer(null);
+		}
+
+		public IMessageProducer CreateProducer(IDestination destination)
+		{
+			ProducerInfo command = CreateProducerInfo(destination);
+			connection.SyncRequest(command);
+			return new MessageProducer(this, command);
+		}
+
+
+		public IMessageConsumer CreateConsumer(IDestination destination)
+		{
+			return CreateConsumer(destination, null);
+		}
+
+		public IMessageConsumer CreateConsumer(IDestination destination, string selector)
+		{
+			return CreateConsumer(destination, selector, false);
+		}
+
+		public IMessageConsumer CreateConsumer(IDestination destination, string selector, bool noLocal)
+		{
+			ConsumerInfo command = CreateConsumerInfo(destination, selector);
+			command.NoLocal = noLocal;
+			command.AcknowledgementMode = acknowledgementMode;
+
+			ConsumerId consumerId = command.ConsumerId;
+
+			try
+			{
+				MessageConsumer consumer = new MessageConsumer(this, command, acknowledgementMode);
+				// lets register the consumer first in case we start dispatching messages immediately
+				connection.AddConsumer(consumerId, consumer);
+
+				connection.SyncRequest(command);
+
+				consumers[consumerId] = consumer;
+				return consumer;
+			}
+			catch(Exception e)
+			{
+				connection.RemoveConsumer(consumerId);
+				throw e;
+			}
+		}
+
+		public IMessageConsumer CreateDurableConsumer(
+				ITopic destination,
+				string name,
+				string selector,
+				bool noLocal)
+		{
+			ConsumerInfo command = CreateConsumerInfo(destination, selector);
+			ConsumerId consumerId = command.ConsumerId;
+			command.SubscriptionName = name;
+			command.NoLocal = noLocal;
+
+			try
+			{
+				MessageConsumer consumer = new MessageConsumer(this, command, acknowledgementMode);
+				// lets register the consumer first in case we start dispatching messages immediately
+				connection.AddConsumer(consumerId, consumer);
+
+				connection.SyncRequest(command);
+
+				consumers[consumerId] = consumer;
+				return consumer;
+			}
+			catch(Exception e)
+			{
+				connection.RemoveConsumer(consumerId);
+				throw e;
+			}
+		}
+
+		public IQueue GetQueue(string name)
+		{
+			return new ActiveMQQueue(name);
+		}
+
+		public ITopic GetTopic(string name)
+		{
+			return new ActiveMQTopic(name);
+		}
+
+		public ITemporaryQueue CreateTemporaryQueue()
+		{
+			ActiveMQTempQueue answer = new ActiveMQTempQueue(connection.CreateTemporaryDestinationName());
+			CreateTemporaryDestination(answer);
+			return answer;
+		}
+
+		public ITemporaryTopic CreateTemporaryTopic()
+		{
+			ActiveMQTempTopic answer = new ActiveMQTempTopic(connection.CreateTemporaryDestinationName());
+			CreateTemporaryDestination(answer);
+			return answer;
+		}
+
+
+		public IMessage CreateMessage()
+		{
+			ActiveMQMessage answer = new ActiveMQMessage();
+			Configure(answer);
+			return answer;
+		}
+
+
+		public ITextMessage CreateTextMessage()
+		{
+			ActiveMQTextMessage answer = new ActiveMQTextMessage();
+			Configure(answer);
+			return answer;
+		}
+
+		public ITextMessage CreateTextMessage(string text)
+		{
+			ActiveMQTextMessage answer = new ActiveMQTextMessage(text);
+			Configure(answer);
+			return answer;
+		}
+
+		public IMapMessage CreateMapMessage()
+		{
+			return new ActiveMQMapMessage();
+		}
+
+		public IBytesMessage CreateBytesMessage()
+		{
+			return new ActiveMQBytesMessage();
+		}
+
+		public IBytesMessage CreateBytesMessage(byte[] body)
+		{
+			ActiveMQBytesMessage answer = new ActiveMQBytesMessage();
+			answer.Content = body;
+			return answer;
+		}
+
+		public IObjectMessage CreateObjectMessage(object body)
+		{
+			ActiveMQObjectMessage answer = new ActiveMQObjectMessage();
+			answer.Body = body;
+			return answer;
+		}
+
+		public void Commit()
+		{
+			if(!Transacted)
+			{
+				throw new InvalidOperationException(
+						"You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: "
+						+ acknowledgementMode);
+			}
+			transactionContext.Commit();
+		}
+
+		public void Rollback()
+		{
+			if(!Transacted)
+			{
+				throw new InvalidOperationException(
+						"You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: "
+						+ acknowledgementMode);
+			}
+			transactionContext.Rollback();
+
+			// lets ensure all the consumers redeliver any rolled back messages
+			foreach(MessageConsumer consumer in GetConsumers())
+			{
+				consumer.RedeliverRolledBackMessages();
+			}
+		}
+
+
+		// Properties
+
+		public AcknowledgementMode AcknowledgementMode
+		{
+			get { return acknowledgementMode; }
+		}
+
+		public bool Transacted
+		{
+			get { return acknowledgementMode == AcknowledgementMode.Transactional; }
+		}
+
+		public void Close()
+		{
+			// To do: what about session id?
+			StopAsyncDelivery();
+		}
+
+		#endregion
+
+		private void dispatchingThread_ExceptionListener(Exception exception)
+		{
+			connection.OnSessionException(this, exception);
+		}
+
+		protected void CreateTemporaryDestination(ActiveMQDestination tempDestination)
+		{
+			DestinationInfo command = new DestinationInfo();
+			command.ConnectionId = connection.ConnectionId;
+			command.OperationType = 0; // 0 is add
+			command.Destination = tempDestination;
+
+			connection.SyncRequest(command);
+		}
+
+		protected void DestroyTemporaryDestination(ActiveMQDestination tempDestination)
+		{
+			DestinationInfo command = new DestinationInfo();
+			command.ConnectionId = connection.ConnectionId;
+			command.OperationType = 1; // 1 is remove
+			command.Destination = tempDestination;
+
+			connection.SyncRequest(command);
+		}
+
+		public void DoSend(ActiveMQMessage message)
+		{
+			if(AsyncSend)
+			{
+				connection.OneWay(message);
+			}
+			else
+			{
+				connection.SyncRequest(message);
+			}
+		}
+
+		/// <summary>
+		/// Ensures that a transaction is started
+		/// </summary>
+		public void DoStartTransaction()
+		{
+			if(Transacted)
+			{
+				transactionContext.Begin();
+			}
+		}
+
+		public void DisposeOf(ConsumerId objectId)
+		{
+			consumers.Remove(objectId);
+			connection.RemoveConsumer(objectId);
+			connection.DisposeOf(objectId);
+		}
+
+		/// <summary>
+		/// Private method called by the dispatcher thread in order to perform
+		/// asynchronous delivery of queued (inbound) messages.
+		/// </summary>
+		private void DispatchAsyncMessages()
+		{
+			// lets iterate through each consumer created by this session
+			// ensuring that they have all pending messages dispatched
+			foreach(MessageConsumer consumer in GetConsumers())
+			{
+				consumer.DispatchAsyncMessages();
+			}
+		}
+
+
+		/// <summary>
+		/// Returns a copy of the current consumers in a thread safe way to avoid concurrency
+		/// problems if the consumers are changed in another thread
+		/// </summary>
+		protected ICollection GetConsumers()
+		{
+			lock(consumers.SyncRoot)
+			{
+				return new ArrayList(consumers.Values);
+			}
+		}
+
+		protected virtual ConsumerInfo CreateConsumerInfo(IDestination destination, string selector)
+		{
+			ConsumerInfo answer = new ConsumerInfo();
+			ConsumerId id = new ConsumerId();
+			id.ConnectionId = info.SessionId.ConnectionId;
+			id.SessionId = info.SessionId.Value;
+			lock(this)
+			{
+				id.Value = ++consumerCounter;
+			}
+			answer.ConsumerId = id;
+			answer.Destination = ActiveMQDestination.Transform(destination);
+			answer.Selector = selector;
+			answer.PrefetchSize = prefetchSize;
+			answer.Priority = priority;
+			answer.Exclusive = exclusive;
+			answer.DispatchAsync = dispatchAsync;
+			answer.Retroactive = retroactive;
+
+			// If the destination contained a URI query, then use it to set public properties
+			// on the ConsumerInfo
+			ActiveMQDestination amqDestination = destination as ActiveMQDestination;
+			if(amqDestination != null && amqDestination.Options != null)
+			{
+				Util.URISupport.SetProperties(answer, amqDestination.Options, "consumer.");
+			}
+
+			return answer;
+		}
+
+		protected virtual ProducerInfo CreateProducerInfo(IDestination destination)
+		{
+			ProducerInfo answer = new ProducerInfo();
+			ProducerId id = new ProducerId();
+			id.ConnectionId = info.SessionId.ConnectionId;
+			id.SessionId = info.SessionId.Value;
+			lock(this)
+			{
+				id.Value = ++producerCounter;
+			}
+			answer.ProducerId = id;
+			answer.Destination = ActiveMQDestination.Transform(destination);
+
+			// If the destination contained a URI query, then use it to set public
+			// properties on the ProducerInfo
+			ActiveMQDestination amqDestination = destination as ActiveMQDestination;
+			if(amqDestination != null && amqDestination.Options != null)
+			{
+				Util.URISupport.SetProperties(answer, amqDestination.Options, "producer.");
+			}
+
+			return answer;
+		}
+
+		/// <summary>
+		/// Configures the message command
+		/// </summary>
+		protected void Configure(ActiveMQMessage message)
+		{
+		}
+
+		internal void StopAsyncDelivery()
+		{
+			dispatchingThread.Stop();
+		}
+
+		internal void StartAsyncDelivery(Dispatcher dispatcher)
+		{
+			if(dispatcher != null)
+			{
+				dispatcher.SetAsyncDelivery(dispatchingThread.EventHandle);
+			}
+			dispatchingThread.Start();
+		}
+	}
+}
\ No newline at end of file

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/TransactionContext.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/TransactionContext.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/TransactionContext.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/TransactionContext.cs Mon Oct 29 06:55:09 2007
@@ -14,12 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using ActiveMQ;
-using ActiveMQ.Commands;
+using Apache.ActiveMQ;
+using Apache.ActiveMQ.Commands;
 using System.Collections;
 
 
-namespace ActiveMQ
+namespace Apache.ActiveMQ
 {
 	public enum TransactionType
     {
@@ -27,7 +27,7 @@
     }
 }
 
-namespace ActiveMQ
+namespace Apache.ActiveMQ
 {
 	public class TransactionContext
     {

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/FutureResponse.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/FutureResponse.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/FutureResponse.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/FutureResponse.cs Mon Oct 29 06:55:09 2007
@@ -14,12 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using ActiveMQ.Commands;
+using Apache.ActiveMQ.Commands;
 using System;
 using System.Threading;
-using ActiveMQ.Util;
+using Apache.ActiveMQ.Util;
+using Apache.NMS;
 
-namespace ActiveMQ.Transport
+namespace Apache.ActiveMQ.Transport
 {
 	
 	/// <summary>

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/ITransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/ITransport.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/ITransport.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/ITransport.cs Mon Oct 29 06:55:09 2007
@@ -14,11 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using ActiveMQ.Commands;
-using NMS;
+using Apache.ActiveMQ.Commands;
+using Apache.NMS;
 using System;
 
-namespace ActiveMQ.Transport
+namespace Apache.ActiveMQ.Transport
 {
 	public delegate void CommandHandler(ITransport sender, Command command);
 	public delegate void ExceptionHandler(ITransport sender, Exception command);

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/ITransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/ITransportFactory.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/ITransportFactory.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/ITransportFactory.cs Mon Oct 29 06:55:09 2007
@@ -17,7 +17,7 @@
 
 using System;
 
-namespace ActiveMQ.Transport
+namespace Apache.ActiveMQ.Transport
 {
 	public interface ITransportFactory
     {

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/IWireFormat.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/IWireFormat.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/IWireFormat.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/IWireFormat.cs Mon Oct 29 06:55:09 2007
@@ -17,7 +17,7 @@
 using System;
 using System.IO;
 
-namespace ActiveMQ.Transport
+namespace Apache.ActiveMQ.Transport
 {
 	/// <summary>
 	/// Represents the marshalling of commands to and from an IO stream

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/LoggingTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/LoggingTransport.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/LoggingTransport.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/LoggingTransport.cs Mon Oct 29 06:55:09 2007
@@ -14,11 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using ActiveMQ.Commands;
-using ActiveMQ.Transport;
+using Apache.NMS;
+using Apache.ActiveMQ.Commands;
+using Apache.ActiveMQ.Transport;
 using System;
 
-namespace ActiveMQ.Transport
+namespace Apache.ActiveMQ.Transport
 {
 	
 	/// <summary>

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/MutexTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/MutexTransport.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/MutexTransport.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/MutexTransport.cs Mon Oct 29 06:55:09 2007
@@ -14,11 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using ActiveMQ.Commands;
-using ActiveMQ.Transport;
+using Apache.ActiveMQ.Commands;
+using Apache.ActiveMQ.Transport;
 using System;
 
-namespace ActiveMQ.Transport
+namespace Apache.ActiveMQ.Transport
 {
 	
     /// <summary>

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/ResponseCorrelator.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/ResponseCorrelator.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/ResponseCorrelator.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/ResponseCorrelator.cs Mon Oct 29 06:55:09 2007
@@ -18,10 +18,11 @@
 using System;
 using System.Collections;
 
-using ActiveMQ.Commands;
-using ActiveMQ.Transport;
+using Apache.ActiveMQ.Commands;
+using Apache.ActiveMQ.Transport;
+using Apache.NMS;
 
-namespace ActiveMQ.Transport
+namespace Apache.ActiveMQ.Transport
 {
 	
     /// <summary>

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompFrameStream.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompFrameStream.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompFrameStream.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompFrameStream.cs Mon Oct 29 06:55:09 2007
@@ -15,16 +15,16 @@
  * limitations under the License.
  */
 using System.Reflection;
-using ActiveMQ.Commands;
-using ActiveMQ.OpenWire.V1;
-using ActiveMQ.Transport;
-using NMS;
+using Apache.ActiveMQ.Commands;
+using Apache.ActiveMQ.OpenWire.V1;
+using Apache.ActiveMQ.Transport;
+using Apache.NMS;
 using System;
 using System.Collections;
 using System.IO;
 using System.Text;
 
-namespace ActiveMQ.Transport.Stomp
+namespace Apache.ActiveMQ.Transport.Stomp
 {
     /// <summary>
     /// A Stream for writing a <a href="http://stomp.codehaus.org/">STOMP</a> Frame
@@ -103,14 +103,9 @@
 			{
 				ds.Write(content);
 			}
-			
-			// if no content length then lets write a null
-			if (contentLength < 0)
-			{
-				ds.Write(NULL);
-			}
-		}
 
-		
+			// Always write a terminating NULL byte to end the content frame.
+			ds.Write(NULL);
+		}
     }
 }

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompHelper.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompHelper.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompHelper.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompHelper.cs Mon Oct 29 06:55:09 2007
@@ -1,194 +1,261 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System.Reflection;
-using ActiveMQ.Commands;
-using ActiveMQ.OpenWire.V1;
-using ActiveMQ.Transport;
-using NMS;
-using System;
-using System.Collections;
-using System.IO;
-using System.Text;
-
-namespace ActiveMQ.Transport.Stomp
-{
-    /// <summary>
-    /// Some <a href="http://stomp.codehaus.org/">STOMP</a> protocol conversion helper methods.
-    /// </summary>
-    public class StompHelper
-    {
-
-
-		public static ActiveMQDestination ToDestination(string text)
-		{
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Text;
+using Apache.ActiveMQ.Commands;
+using Apache.NMS;
+
+namespace Apache.ActiveMQ.Transport.Stomp
+{
+    /// <summary>
+    /// Some <a href="http://stomp.codehaus.org/">STOMP</a> protocol conversion helper methods.
+    /// </summary>
+    public class StompHelper
+    {
+
+
+		public static ActiveMQDestination ToDestination(string text)
+		{
 		    if( text == null )
 		    {
-                return null;
-		    }		    
-			int type = ActiveMQDestination.ACTIVEMQ_QUEUE;
-			if (text.StartsWith("/queue/"))
-			{
-				text = text.Substring("/queue/".Length);
-			}
-			else if (text.StartsWith("/topic/"))
-			{
-				text = text.Substring("/topic/".Length);
-				type = ActiveMQDestination.ACTIVEMQ_TOPIC;
-			}
-			else if (text.StartsWith("/temp-topic/"))
-			{
-				text = text.Substring("/temp-topic/".Length);
-				type = ActiveMQDestination.ACTIVEMQ_TEMPORARY_TOPIC;
-			}
-			else if (text.StartsWith("/temp-queue/"))
-			{
-				text = text.Substring("/temp-queue/".Length);
-				type = ActiveMQDestination.ACTIVEMQ_TEMPORARY_QUEUE;
-			}
-			return ActiveMQDestination.CreateDestination(type, text);
-		}
-
-		public static string ToStomp(ActiveMQDestination destination)
-		{
-			if (destination == null)
-			{
-				return null;
-			}
-			else
-			{
-				switch (destination.DestinationType)
-				{
-					case DestinationType.Topic:
-						return "/topic/" + destination.PhysicalName;
-					
-					case DestinationType.TemporaryTopic:
-						return "/temp-topic/" + destination.PhysicalName;
-					
-					case DestinationType.TemporaryQueue:
-						return "/temp-queue/" + destination.PhysicalName;
-					
-					default:
-						return "/queue/" + destination.PhysicalName;
-				}
-			}
-		}
-		
-		public static string ToStomp(ConsumerId id)
-		{
-			return id.ConnectionId + ":" + id.SessionId + ":" + id.Value;
-		}
-		
-		public static ConsumerId ToConsumerId(string text)
-		{
-			if (text == null)
-			{
-				return null;
-			}
-			ConsumerId answer = new ConsumerId();
-			int idx = text.LastIndexOf(':');
-			if (idx >= 0) {
-				answer.Value = Int32.Parse(text.Substring(idx + 1));
-				text = text.Substring(0, idx);
-				idx = text.LastIndexOf(':');
-				if (idx >= 0) {
-					answer.SessionId = Int32.Parse(text.Substring(idx + 1));
-					text = text.Substring(0, idx);
-				}
-			}
-			answer.ConnectionId = text;
-			return answer;
-		}
-		
-		public static string ToStomp(ProducerId id)
-		{
-			return id.ConnectionId + ":" + id.SessionId + ":" + id.Value;
-		}
-		
-		public static ProducerId ToProducerId(string text)
-		{
-			if (text == null)
-			{
-				return null;
-			}
-			ProducerId answer = new ProducerId();
-			int idx = text.LastIndexOf(':');
-			if (idx >= 0) {
-				answer.Value = Int32.Parse(text.Substring(idx + 1));
-				text = text.Substring(0, idx);
-				idx = text.LastIndexOf(':');
-				if (idx >= 0) {
-					answer.SessionId = Int32.Parse(text.Substring(idx + 1));
-					text = text.Substring(0, idx);
-				}
-			}
-			answer.ConnectionId = text;
-			return answer;
-		}
-		
-		public static string ToStomp(MessageId id)
-		{
-			return ToStomp(id.ProducerId) + ":" + id.BrokerSequenceId + ":" + id.ProducerSequenceId;
-		}
-		
-		public static MessageId ToMessageId(string text)
-		{
-			if (text == null)
-			{
-				return null;
-			}
-			MessageId answer = new MessageId();
-			int idx = text.LastIndexOf(':');
-			if (idx >= 0) {
-				answer.ProducerSequenceId = Int32.Parse(text.Substring(idx + 1));
-				text = text.Substring(0, idx);
-				idx = text.LastIndexOf(':');
-				if (idx >= 0) {
-					answer.BrokerSequenceId = Int32.Parse(text.Substring(idx + 1));
-					text = text.Substring(0, idx);
-				}
-			}
-			answer.ProducerId = ToProducerId(text);
-			return answer;
-		}
-	
-		public static string ToStomp(TransactionId id)
-		{
-			if (id is LocalTransactionId)
-			{
-				return ToStomp(id as LocalTransactionId);
-			}
-			return id.ToString();
-		}
-		
-		public static string ToStomp(LocalTransactionId transactionId)
-		{
-			return transactionId.ConnectionId.Value + ":" + transactionId.Value;
-		}
-		
-		public static bool ToBool(string text, bool defaultValue)
-		{
-			if (text == null)
-			{
-				return defaultValue;
-			}
-			else
-			{
-				return "true" == text || "TRUE" == text;
-			}
-		}
-    }
-}
+                return null;
+		    }		    
+			int type = ActiveMQDestination.ACTIVEMQ_QUEUE;
+			if (text.StartsWith("/queue/"))
+			{
+				text = text.Substring("/queue/".Length);
+			}
+			else if (text.StartsWith("/topic/"))
+			{
+				text = text.Substring("/topic/".Length);
+				type = ActiveMQDestination.ACTIVEMQ_TOPIC;
+			}
+			else if (text.StartsWith("/temp-topic/"))
+			{
+				text = text.Substring("/temp-topic/".Length);
+				type = ActiveMQDestination.ACTIVEMQ_TEMPORARY_TOPIC;
+			}
+			else if (text.StartsWith("/temp-queue/"))
+			{
+				text = text.Substring("/temp-queue/".Length);
+				type = ActiveMQDestination.ACTIVEMQ_TEMPORARY_QUEUE;
+			}
+			return ActiveMQDestination.CreateDestination(type, text);
+		}
+
+		public static string ToStomp(ActiveMQDestination destination)
+		{
+			if (destination == null)
+			{
+				return null;
+			}
+			else
+			{
+				switch (destination.DestinationType)
+				{
+					case DestinationType.Topic:
+						return "/topic/" + destination.PhysicalName;
+					
+					case DestinationType.TemporaryTopic:
+						return "/temp-topic/" + destination.PhysicalName;
+					
+					case DestinationType.TemporaryQueue:
+						return "/temp-queue/" + destination.PhysicalName;
+					
+					default:
+						return "/queue/" + destination.PhysicalName;
+				}
+			}
+		}
+		
+		public static string ToStomp(ConsumerId id)
+		{
+			return id.ConnectionId + ":" + id.SessionId + ":" + id.Value;
+		}
+		
+		public static ConsumerId ToConsumerId(string text)
+		{
+			if (text == null)
+			{
+				return null;
+			}
+			ConsumerId answer = new ConsumerId();
+			int idx = text.LastIndexOf(':');
+			if (idx >= 0) {
+				try
+				{
+					answer.Value = Int32.Parse(text.Substring(idx + 1));
+					text = text.Substring(0, idx);
+					idx = text.LastIndexOf(':');
+					if (idx >= 0) {
+						try
+						{
+							answer.SessionId = Int32.Parse(text.Substring(idx + 1));
+							text = text.Substring(0, idx);
+						}
+						catch(Exception ex)
+						{
+							Tracer.Debug(ex.Message);
+						}
+					}
+				}
+				catch(Exception ex)
+				{
+					Tracer.Debug(ex.Message);
+				}
+			}
+			answer.ConnectionId = text;
+			return answer;
+		}
+		
+		public static string ToStomp(ProducerId id)
+		{
+			StringBuilder producerBuilder = new StringBuilder();
+
+			producerBuilder.Append(id.ConnectionId);
+			if(0 != id.SessionId)
+			{
+				producerBuilder.Append(":");
+				producerBuilder.Append(id.SessionId);
+			}
+
+			if(0 != id.Value)
+			{
+				producerBuilder.Append(":");
+				producerBuilder.Append(id.Value);
+			}
+
+			return producerBuilder.ToString();
+		}
+		
+		public static ProducerId ToProducerId(string text)
+		{
+			if (text == null)
+			{
+				return null;
+			}
+			ProducerId answer = new ProducerId();
+			int idx = text.LastIndexOf(':');
+			if (idx >= 0) {
+				try
+				{
+					answer.Value = Int32.Parse(text.Substring(idx + 1));
+					text = text.Substring(0, idx);
+					idx = text.LastIndexOf(':');
+					if (idx >= 0) {
+						try
+						{
+							answer.SessionId = Int32.Parse(text.Substring(idx + 1));
+							text = text.Substring(0, idx);
+						}
+						catch(Exception ex)
+						{
+							Tracer.Debug(ex.Message);
+						}
+					}
+				}
+				catch(Exception ex)
+				{
+					Tracer.Debug(ex.Message);
+				}
+			}
+			answer.ConnectionId = text;
+			return answer;
+		}
+		
+		public static string ToStomp(MessageId id)
+		{
+			StringBuilder messageBuilder = new StringBuilder();
+
+			messageBuilder.Append(ToStomp(id.ProducerId));
+			if(0 != id.BrokerSequenceId)
+			{
+				messageBuilder.Append(":");
+				messageBuilder.Append(id.BrokerSequenceId);
+			}
+
+			if(0 != id.ProducerSequenceId)
+			{
+				messageBuilder.Append(":");
+				messageBuilder.Append(id.ProducerSequenceId);
+			}
+			
+			return messageBuilder.ToString();
+		}
+		
+		public static MessageId ToMessageId(string text)
+		{
+			if (text == null)
+			{
+				return null;
+			}
+			MessageId answer = new MessageId();
+			int idx = text.LastIndexOf(':');
+			if (idx >= 0) {
+				try
+				{
+					answer.ProducerSequenceId = Int32.Parse(text.Substring(idx + 1));
+					text = text.Substring(0, idx);
+					idx = text.LastIndexOf(':');
+					if (idx >= 0) {
+						try
+						{
+							answer.BrokerSequenceId = Int32.Parse(text.Substring(idx + 1));
+							text = text.Substring(0, idx);
+						}
+						catch(Exception ex)
+						{
+							Tracer.Debug(ex.Message);
+						}
+					}
+				}
+				catch(Exception ex)
+				{
+					Tracer.Debug(ex.Message);
+				}
+			}
+			answer.ProducerId = ToProducerId(text);
+			return answer;
+		}
+	
+		public static string ToStomp(TransactionId id)
+		{
+			if (id is LocalTransactionId)
+			{
+				return ToStomp(id as LocalTransactionId);
+			}
+			return id.ToString();
+		}
+		
+		public static string ToStomp(LocalTransactionId transactionId)
+		{
+			return transactionId.ConnectionId.Value + ":" + transactionId.Value;
+		}
+		
+		public static bool ToBool(string text, bool defaultValue)
+		{
+			if (text == null)
+			{
+				return defaultValue;
+			}
+			else
+			{
+				return "true" == text || "TRUE" == text;
+			}
+		}
+    }
+}

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Stomp/StompWireFormat.cs Mon Oct 29 06:55:09 2007
@@ -15,16 +15,16 @@
  * limitations under the License.
  */
 using System.Reflection;
-using ActiveMQ.Commands;
-using ActiveMQ.OpenWire.V1;
-using ActiveMQ.Transport;
-using NMS;
+using Apache.ActiveMQ.Commands;
+using Apache.ActiveMQ.OpenWire.V1;
+using Apache.ActiveMQ.Transport;
+using Apache.NMS;
 using System;
 using System.Collections;
 using System.IO;
 using System.Text;
 
-namespace ActiveMQ.Transport.Stomp
+namespace Apache.ActiveMQ.Transport.Stomp
 {
     /// <summary>
     /// Implements the <a href="http://stomp.codehaus.org/">STOMP</a> protocol.
@@ -50,8 +50,7 @@
 
         public void Marshal(Object o, BinaryWriter binaryWriter)
         {
-			Console.WriteLine(">>>> " + o);
-			//Console.Out.Flush();
+			Tracer.Debug(">>>> " + o);
 			StompFrameStream ds = new StompFrameStream(binaryWriter, encoding);
 			
 			if (o is ConnectionInfo)
@@ -91,13 +90,11 @@
 					response.CorrelationId = command.CommandId;
 					SendCommand(response);
 				}
-				Console.WriteLine("#### Ignored command: " + o.GetType());
-                Console.Out.Flush();
+				Tracer.Debug("#### Ignored command: " + o.GetType());
 			}
 			else
 			{
-				Console.WriteLine("#### Ignored command: " + o.GetType());
-                Console.Out.Flush();
+				Tracer.Debug("#### Ignored command: " + o.GetType());
 			}
         }
 
@@ -130,7 +127,7 @@
 			}
 			while (command == "");
 			
-			Console.WriteLine("<<<< command: " + command);
+			Tracer.Debug("<<<< command: " + command);
 			
 			IDictionary headers = new Hashtable();
 			string line;
@@ -143,7 +140,7 @@
 					string value = line.Substring(idx + 1);
 					headers[key] = value;
 					
-					Console.WriteLine("<<<< header: " + key + " = " + value);
+					Tracer.Debug("<<<< header: " + key + " = " + value);
 				}
 				else
 				{
@@ -156,6 +153,12 @@
 			{
 				int size = Int32.Parse(length);
 				content = dis.ReadBytes(size);
+				// Read the terminating NULL byte for this frame.
+				int nullByte = dis.Read();
+				if(nullByte != 0)
+				{
+					Tracer.Debug("<<<< error reading frame null byte.");
+				}
 			}
 			else
 			{
@@ -173,8 +176,7 @@
                 content = ms.ToArray();
 			}
 			Object answer = CreateCommand(command, headers, content);
-			Console.WriteLine("<<<< received: " + answer);
-			Console.Out.Flush();
+			Tracer.Debug("<<<< received: " + answer);
 			return answer;
         }
 
@@ -217,7 +219,7 @@
 			{
 				return ReadMessage(command, headers, content);
 			}
-			Console.WriteLine("Unknown command: " + command + " headers: " + headers);
+			Tracer.Error("Unknown command: " + command + " headers: " + headers);
 			return null;
 		}
 		
@@ -319,7 +321,10 @@
             ss.WriteHeader("selector", command.Selector);
             if ( command.NoLocal )
                 ss.WriteHeader("no-local", command.NoLocal);
-			ss.WriteHeader("ack", "client");
+
+			if ( AcknowledgementMode.ClientAcknowledge == command.AcknowledgementMode
+				|| AcknowledgementMode.AutoClientAcknowledge == command.AcknowledgementMode )
+				ss.WriteHeader("ack", "client");
 
 			// ActiveMQ extensions to STOMP
 			ss.WriteHeader("activemq.dispatchAsync", command.DispatchAsync);
@@ -328,7 +333,7 @@
 		    
 			ss.WriteHeader("activemq.maximumPendingMessageLimit", command.MaximumPendingMessageLimit);
 			ss.WriteHeader("activemq.prefetchSize", command.PrefetchSize);
-			ss.WriteHeader("activemq.priority ", command.Priority);
+			ss.WriteHeader("activemq.priority", command.Priority);
             if ( command.Retroactive )
 			    ss.WriteHeader("activemq.retroactive", command.Retroactive);
 
@@ -393,12 +398,10 @@
 						type = "ABORT";
 						break;
 				}
-				Console.WriteLine(">>> For transaction type: " + transactionType + " we are using command type: " + type);
-				
+
+				Tracer.Debug(">>> For transaction type: " + transactionType + " we are using command type: " + type);
 				ss.WriteCommand(command, type);
-				
 				ss.WriteHeader("transaction", StompHelper.ToStomp(id));
-				
 				ss.Flush();
 			}
 		}
@@ -433,7 +436,14 @@
 			else
 			{
 				ss.Content = command.Content;
-				ss.ContentLength = command.Content.Length;
+				if(null != command.Content)
+				{
+					ss.ContentLength = command.Content.Length;
+				}
+				else
+				{
+					ss.ContentLength = 0;
+				}
 			}
 	
 			IPrimitiveMap map = command.Properties;
@@ -460,7 +470,7 @@
 		{
 			if (transport == null)
 			{
-				Console.WriteLine("No transport configured so cannot return command: " + command);
+				Tracer.Fatal("No transport configured so cannot return command: " + command);
 			}
 			else
 			{

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransport.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransport.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransport.cs Mon Oct 29 06:55:09 2007
@@ -14,18 +14,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using ActiveMQ;
-using ActiveMQ.Commands;
-using ActiveMQ.OpenWire;
-using ActiveMQ.Transport;
+using Apache.ActiveMQ.Commands;
+using Apache.ActiveMQ.OpenWire;
+using Apache.ActiveMQ.Transport;
 using System;
-using System.Collections;
 using System.IO;
-using System.Net;
 using System.Net.Sockets;
 using System.Threading;
 
-namespace ActiveMQ.Transport.Tcp
+namespace Apache.ActiveMQ.Transport.Tcp
 {
 	
     /// <summary>
@@ -33,11 +30,14 @@
     /// </summary>
     public class TcpTransport : ITransport
     {
-        private Socket socket;
-        private IWireFormat wireformat = new OpenWireFormat();
+		private readonly object initLock = "initLock";
+        private readonly Socket socket;
+    	private IWireFormat wireformat;
         private BinaryReader socketReader;
+		private readonly object socketReaderLock = "socketReaderLock";
         private BinaryWriter socketWriter;
-        private Thread readThread;
+		private readonly object socketWriterLock = "socketWriterLock";
+		private Thread readThread;
         private bool started;
         private Util.AtomicBoolean closed = new Util.AtomicBoolean(false);
         
@@ -55,30 +55,62 @@
         /// </summary>
         public void Start()
         {
-            if (!started)
-            {
-                if( commandHandler == null )
-                    throw new InvalidOperationException ("command cannot be null when Start is called.");
-                if( exceptionHandler == null )
-                    throw new InvalidOperationException ("exception cannot be null when Start is called.");
+			lock (initLock)
+			{
+				if (!started)
+				{
+					if (null == commandHandler)
+					{
+                		throw new InvalidOperationException(
+                				"command cannot be null when Start is called.");
+					}
 
-                started = true;
-                
-                // As reported in AMQ-988 it appears that NetworkStream is not thread safe
-                // so lets use an instance for each of the 2 streams
-                socketWriter = new OpenWireBinaryWriter(new NetworkStream(socket));
-                socketReader = new OpenWireBinaryReader(new NetworkStream(socket));
-                
-                // now lets create the background read thread
-                readThread = new Thread(new ThreadStart(ReadLoop));
-                readThread.Start();
-            }
+					if (null == exceptionHandler)
+            		{
+            			throw new InvalidOperationException(
+            					"exception cannot be null when Start is called.");
+            		}
+
+            		started = true;
+	                
+					// As reported in AMQ-988 it appears that NetworkStream is not thread safe
+					// so lets use an instance for each of the 2 streams
+					socketWriter = new OpenWireBinaryWriter(new NetworkStream(socket));
+					socketReader = new OpenWireBinaryReader(new NetworkStream(socket));
+	                
+					// now lets create the background read thread
+					readThread = new Thread(ReadLoop);
+					readThread.Start();
+				}
+			}
         }
         
         public void Oneway(Command command)
         {
-            Wireformat.Marshal(command, socketWriter);
-            socketWriter.Flush();
+			lock (socketWriterLock)
+			{
+				try
+				{
+					Wireformat.Marshal(command, socketWriter);
+					socketWriter.Flush();
+				}
+				catch(Exception ex)
+				{
+					if (command.ResponseRequired)
+					{
+						// Make sure that something higher up doesn't get blocked.
+						// Respond with an exception.
+						ExceptionResponse er = new ExceptionResponse();
+						BrokerError error = new BrokerError();
+
+						error.Message = "Transport connection error: " + ex.Message;
+						error.ExceptionClass = ex.ToString();
+						er.Exception = error;
+						er.CorrelationId = command.CommandId;
+						commandHandler(this, er);
+					}
+				}
+			}
         }
         
         public FutureResponse AsyncRequest(Command command)
@@ -93,14 +125,50 @@
 
         public void Close()
         {
-            if (closed.CompareAndSet(false, true))
-            {
-                socket.Close();
-                if (System.Threading.Thread.CurrentThread != readThread)
-                    readThread.Join();
-                socketWriter.Close();
-                socketReader.Close();
-            }
+			lock (initLock)
+			{
+				if (closed.CompareAndSet(false, true))
+				{
+					try
+					{
+						socket.Shutdown(SocketShutdown.Both);
+					}
+					catch
+					{
+					}
+
+					lock (socketWriterLock)
+					{
+						if(null != socketWriter)
+						{
+            				socketWriter.Close();
+							socketWriter = null;
+						}
+					}
+
+					lock (socketReaderLock)
+					{
+						if(null != socketReader)
+						{
+							socketReader.Close();
+							socketReader = null;
+						}
+					}
+
+					socket.Close();
+
+					if(null != readThread
+						&& Thread.CurrentThread != readThread
+						&& readThread.IsAlive)
+					{
+						readThread.Abort();
+						readThread.Join();
+						readThread = null;
+					}
+				}
+
+				started = false;
+			}
         }
 
         public void Dispose()
@@ -133,11 +201,11 @@
                 }
                 catch(Exception ex)
                 {
-                    if( !closed.Value )
+                    if (!closed.Value)
                     {
-                        this.exceptionHandler(this, ex);
-                        // Close the socket as there's little that can be done with this transport now.
-                        Close();
+						// Close the socket as there's little that can be done with this transport now.
+						Close();
+						this.exceptionHandler(this, ex);
                         break;
                     }
                 }
@@ -149,7 +217,7 @@
 						this.commandHandler(this, command);
 					}
                 }
-                catch ( Exception e)
+                catch (Exception e)
                 {
                     this.exceptionHandler(this, e);
                 }
@@ -158,12 +226,14 @@
                 
         // Implementation methods
                 
-        public CommandHandler Command {
+        public CommandHandler Command
+		{
             get { return commandHandler; }
             set { this.commandHandler = value; }
         }
 
-        public  ExceptionHandler Exception {
+        public  ExceptionHandler Exception
+		{
             get { return exceptionHandler; }
             set { this.exceptionHandler = value; }
         }

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransportFactory.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransportFactory.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/Tcp/TcpTransportFactory.cs Mon Oct 29 06:55:09 2007
@@ -18,88 +18,106 @@
 using System;
 using System.Net;
 using System.Net.Sockets;
-using ActiveMQ.Commands;
-using ActiveMQ.OpenWire;
-using ActiveMQ.Transport;
-using ActiveMQ.Transport.Stomp;
-using ActiveMQ.Util;
-
-namespace ActiveMQ.Transport.Tcp {
-        public class TcpTransportFactory : ITransportFactory
-        {
-                private bool useLogging = false;
-
-                public bool UseLogging {
-                        get { return useLogging; } 
-                        set { useLogging = value; } 
-                }
-
-                public ITransport CreateTransport(Uri location)
-                {
-                        // Extract query parameters from broker Uri
-                        System.Collections.Specialized.StringDictionary map = URISupport.ParseQuery(location.Query);
-
-                        // Set transport. properties on this (the factory)
-                        URISupport.SetProperties(this, map, "transport.");
-
-                        // Console.WriteLine("Opening socket to: " + host + " on port: " + port);
-                        Socket socket = Connect(location.Host, location.Port);
-						IWireFormat wireformat = CreateWireFormat(location, map);
-                        TcpTransport tcpTransport = new TcpTransport(socket, wireformat);
-						wireformat.Transport = tcpTransport;
-                        ITransport rc = tcpTransport;
-
-                        if (UseLogging)
-                        {
-                                rc = new LoggingTransport(rc);
-                        }
-
-						if (wireformat is OpenWireFormat)
-						{
-	                        rc = new WireFormatNegotiator(rc, (OpenWireFormat) wireformat);
-						}
-                        rc = new MutexTransport(rc);
-                        rc = new ResponseCorrelator(rc);
-
-                        return rc;
-                }
-
-                protected Socket Connect(string host, int port)
-                {
-                        // Looping through the AddressList allows different type of connections to be tried
-                        // (IPv4, IPv6 and whatever else may be available).
-                        IPHostEntry hostEntry = Dns.GetHostByName(host);
-                        foreach (IPAddress address in hostEntry.AddressList)
-                        {
-                                Socket socket = new Socket(address.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
-                                socket.Connect(new IPEndPoint(address, port));
-                                if (socket.Connected)
-                                {
-                                        return socket;
-                                }
-                        }
-                        throw new SocketException();
-                }
-
-				protected IWireFormat CreateWireFormat(Uri location, System.Collections.Specialized.StringDictionary map)
+using System.Collections.Specialized;
+using Apache.ActiveMQ.OpenWire;
+using Apache.ActiveMQ.Transport.Stomp;
+using Apache.ActiveMQ.Util;
+using Apache.NMS;
+
+namespace Apache.ActiveMQ.Transport.Tcp
+{
+	public class TcpTransportFactory : ITransportFactory
+	{
+		private bool useLogging = false;
+
+		public TcpTransportFactory()
+		{
+		}
+
+		public bool UseLogging
+		{
+			get { return useLogging; }
+			set { useLogging = value; }
+		}
+
+		#region ITransportFactory Members
+
+		public ITransport CreateTransport(Uri location)
+		{
+			// Extract query parameters from broker Uri
+			StringDictionary map = URISupport.ParseQuery(location.Query);
+
+			// Set transport. properties on this (the factory)
+			URISupport.SetProperties(this, map, "transport.");
+
+			Tracer.Debug("Opening socket to: " + location.Host + " on port: " + location.Port);
+			Socket socket = Connect(location.Host, location.Port);
+			IWireFormat wireformat = CreateWireFormat(location, map);
+			ITransport transport = new TcpTransport(socket, wireformat);
+
+			wireformat.Transport = transport;
+
+			if(UseLogging)
+			{
+				transport = new LoggingTransport(transport);
+			}
+
+			if(wireformat is OpenWireFormat)
+			{
+				transport = new WireFormatNegotiator(transport, (OpenWireFormat) wireformat);
+			}
+
+			transport = new MutexTransport(transport);
+			transport = new ResponseCorrelator(transport);
+
+			return transport;
+		}
+
+		#endregion
+
+		protected Socket Connect(string host, int port)
+		{
+			// Looping through the AddressList allows different type of connections to be tried
+			// (IPv4, IPv6 and whatever else may be available).
+			IPHostEntry hostEntry = Dns.GetHostEntry(host);
+			foreach(IPAddress address in hostEntry.AddressList)
+			{
+				Socket socket = new Socket(address.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
+				socket.Connect(new IPEndPoint(address, port));
+				if(socket.Connected)
 				{
-					// TODO detect STOMP etc
-					if ("stomp".Equals(location.Scheme)) 
-					{
-						IWireFormat answer = new StompWireFormat();
-
-	                    // Set wireformat. properties on the wireformat owned by the tcpTransport
-	                    URISupport.SetProperties(answer, map, "wireFormat.");
-						return answer;
-					}
-					else 
-					{
-						OpenWireFormat answer = new OpenWireFormat();
-
-	                    // Set wireformat. properties on the wireformat owned by the tcpTransport
-	                    URISupport.SetProperties(answer.PreferedWireFormatInfo, map, "wireFormat.");
-						return answer;
-					}
+					return socket;
 				}
-        }
+			}
+			throw new SocketException();
+		}
+
+		protected IWireFormat CreateWireFormat(Uri location, StringDictionary map)
+		{
+			object properties = null;
+			IWireFormat wireFormat = null;
+
+			// Detect STOMP etc
+			if(String.Compare(location.Scheme, "stomp", true) == 0)
+			{
+				wireFormat = new StompWireFormat();
+				properties = wireFormat;
+			}
+			else
+			{
+				OpenWireFormat openwireFormat = new OpenWireFormat();
+
+				wireFormat = openwireFormat;
+				properties = openwireFormat.PreferedWireFormatInfo;
+			}
+
+			if(null != properties)
+			{
+				// Set wireformat. properties on the wireformat owned by the tcpTransport
+				URISupport.SetProperties(properties, map, "wireFormat.");
+			}
+
+			return wireFormat;
+		}
+	}
 }

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/TransportFilter.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/TransportFilter.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/TransportFilter.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/TransportFilter.cs Mon Oct 29 06:55:09 2007
@@ -14,11 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using ActiveMQ.Commands;
-using ActiveMQ.Transport;
+using Apache.ActiveMQ.Commands;
+using Apache.ActiveMQ.Transport;
 using System;
 
-namespace ActiveMQ.Transport
+namespace Apache.ActiveMQ.Transport
 {
 	
 	/// <summary>

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/WireFormatNegotiator.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/WireFormatNegotiator.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/WireFormatNegotiator.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Transport/WireFormatNegotiator.cs Mon Oct 29 06:55:09 2007
@@ -16,13 +16,13 @@
  */
 using System.IO;
 using System.Threading;
-using ActiveMQ.Commands;
-using ActiveMQ.OpenWire;
-using ActiveMQ.Transport;
+using Apache.ActiveMQ.Commands;
+using Apache.ActiveMQ.OpenWire;
+using Apache.ActiveMQ.Transport;
 using System;
-using ActiveMQ.Util;
+using Apache.ActiveMQ.Util;
 
-namespace ActiveMQ.Transport
+namespace Apache.ActiveMQ.Transport
 {
 	
     /// <summary>

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/AtomicBoolean.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/AtomicBoolean.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/AtomicBoolean.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/AtomicBoolean.cs Mon Oct 29 06:55:09 2007
@@ -17,7 +17,7 @@
 using System;
 using System.Text;
 
-namespace ActiveMQ.Util
+namespace Apache.ActiveMQ.Util
 {
     public class AtomicBoolean
     {

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/CountDownLatch.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/CountDownLatch.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/CountDownLatch.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/CountDownLatch.cs Mon Oct 29 06:55:09 2007
@@ -17,7 +17,7 @@
 using System;
 using System.Threading;
 
-namespace ActiveMQ.Util
+namespace Apache.ActiveMQ.Util
 {
     class CountDownLatch
     {

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/DateUtils.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/DateUtils.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/DateUtils.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/DateUtils.cs Mon Oct 29 06:55:09 2007
@@ -16,39 +16,49 @@
  */
 using System;
 
-namespace ActiveMQ.Util
+namespace Apache.ActiveMQ.Util
 {
-	internal class DateUtils
+	public class DateUtils
 	{
 		/// <summary>
-		/// The difference between the Windows epoch (1601-01-01 00:00:00)
-		/// and the Unix epoch (1970-01-01 00:00:00) in milliseconds.
+		/// The start of the Windows epoch
 		/// </summary>
-		public static readonly long EPOCH_DIFF = 11644473600000L;
-
-        /// <summary>
-        /// The start of the UNIX epoch
-        /// </summary>
-        public static readonly DateTime UNIX_EPOCH = new DateTime(1970, 1, 1, 0, 0, 0, 0);
+		public static readonly DateTime windowsEpoch = new DateTime(1601, 1, 1, 0, 0, 0, 0);
+		/// <summary>
+		/// The start of the Java epoch
+		/// </summary>
+		public static readonly DateTime javaEpoch = new DateTime(1970, 1, 1, 0, 0, 0, 0);
 		
 		/// <summary>
-		/// Method ToJavaTime
+		/// The difference between the Windows epoch and the Java epoch
+		/// in milliseconds.
 		/// </summary>
-		/// <param name="timeToLive">A  TimeSpan</param>
-		/// <returns>A  long</retutns>
-		public static long ToJavaTime(TimeSpan timeToLive)
+		public static readonly long epochDiff; /* = 1164447360000L; */
+
+		static DateUtils()
+		{
+			epochDiff = (javaEpoch.ToFileTimeUtc() - windowsEpoch.ToFileTimeUtc())
+							/ TimeSpan.TicksPerMillisecond;
+		}
+
+		public static long ToJavaTime(DateTime dateTime)
+		{
+			return (dateTime.ToFileTime() / TimeSpan.TicksPerMillisecond) - epochDiff;
+		}
+
+		public static DateTime ToDateTime(long javaTime)
 		{
-			return ToJavaTime(new DateTime(timeToLive.Ticks));
+			return DateTime.FromFileTime((javaTime + epochDiff) * TimeSpan.TicksPerMillisecond);
 		}
 
-	    public static long ToJavaTime(DateTime dateTime)
+		public static long ToJavaTimeUtc(DateTime dateTime)
 		{
-			return dateTime.ToFileTime() - EPOCH_DIFF;
+			return (dateTime.ToFileTimeUtc() / TimeSpan.TicksPerMillisecond) - epochDiff;
 		}
 
-        public static DateTime ToDateTime(long dateTime)
-        {
-            return UNIX_EPOCH.AddMilliseconds(dateTime);
-        }
+		public static DateTime ToDateTimeUtc(long javaTime)
+		{
+			return DateTime.FromFileTimeUtc((javaTime + epochDiff) * TimeSpan.TicksPerMillisecond);
+		}
 	}
 }

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/URISupport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/URISupport.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/URISupport.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Util/URISupport.cs Mon Oct 29 06:55:09 2007
@@ -19,7 +19,7 @@
 using System.Globalization;
 using System.Text;
 
-namespace ActiveMQ.Util
+namespace Apache.ActiveMQ.Util
 {
 	/// <summary>
 	/// Class to provide support for URI query parameters which uses .Net reflection

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/BaseMessage.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/BaseMessage.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/BaseMessage.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/BaseMessage.cs Mon Oct 29 06:55:09 2007
@@ -14,19 +14,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using NMS;
+using Apache.NMS;
 using System;
 
-namespace MSMQ
+namespace Apache.MSMQ
 {
 	public delegate void AcknowledgeHandler(BaseMessage baseMessage);
 
     public class BaseMessage : IMessage
     {
-        private PrimitiveMap properties;
+        private PrimitiveMap propertiesMap = new PrimitiveMap();
         private IDestination destination;
         private string correlationId;
-        private TimeSpan expiration;
+        private TimeSpan timeToLive;
         private string messageId;
         private bool persistent;
         private byte priority;
@@ -34,7 +34,7 @@
         private string type;
         private event AcknowledgeHandler Acknowledger;
         private byte[] content;
-        private DateTime timestamp;
+        private DateTime timestamp = new DateTime();
 
         public byte[] Content
         {
@@ -60,7 +60,7 @@
         public IPrimitiveMap Properties
         {
             get {
-                return properties;
+				return propertiesMap;
             }
         }
         
@@ -98,13 +98,13 @@
         /// <summary>
         /// The time in milliseconds that this message should expire in
         /// </summary>
-        public TimeSpan NMSExpiration
+        public TimeSpan NMSTimeToLive
         {
             get {
-                return expiration;
+				return timeToLive;
             }
             set {
-                expiration = value;
+				timeToLive = value;
             }
         }
         

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/BytesMessage.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/BytesMessage.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/BytesMessage.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/BytesMessage.cs Mon Oct 29 06:55:09 2007
@@ -15,10 +15,10 @@
  * limitations under the License.
  */
 using System;
-using NMS;
+using Apache.NMS;
 
 
-namespace MSMQ
+namespace Apache.MSMQ
 {
 	public class BytesMessage : BaseMessage, IBytesMessage
     {

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/CommonAssemblyInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/CommonAssemblyInfo.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/CommonAssemblyInfo.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/CommonAssemblyInfo.cs Mon Oct 29 06:55:09 2007
@@ -6,7 +6,7 @@
 //------------------------------------------------------------------------------
 // <auto-generated>
 //     This code was generated by a tool.
-//     Runtime Version:2.0.50727.42
+//     Runtime Version:2.0.50727.832
 //
 //     Changes to this file may cause incorrect behavior and will be lost if
 //     the code is regenerated.

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/Connection.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/Connection.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/Connection.cs Mon Oct 29 06:55:09 2007
@@ -14,11 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using ActiveMQ;
-using NMS;
+using Apache.NMS;
 using System;
 
-namespace MSMQ
+namespace Apache.MSMQ
 {
     /// <summary>
     /// Represents a NMS connection MSMQ.  Since the underlying MSMQ APIs are actually

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/ConnectionFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/ConnectionFactory.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/ConnectionFactory.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/ConnectionFactory.cs Mon Oct 29 06:55:09 2007
@@ -14,31 +14,46 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using NMS;
 using System;
+using Apache.NMS;
 
-namespace MSMQ
+namespace Apache.MSMQ
 {
     /// <summary>
     /// A Factory that can estbalish NMS connections to MSMQ
     /// </summary>
     public class ConnectionFactory : IConnectionFactory
-    {   
-        //
-        // Creates a connection to MSMQ
-        //
-        public IConnection CreateConnection()
+    {
+		public ConnectionFactory()
+		{
+		}
+
+		public ConnectionFactory(Uri brokerUri, string clientID)
+		{
+		}
+
+    	/// <summary>
+		/// Creates a new connection to MSMQ.
+		/// </summary>
+		public IConnection CreateConnection()
         {
             return new Connection();
         }
-        
-        //
-        // Creates a connection to MSQM
-        //
-        public IConnection CreateConnection(string userName, string password)
+
+		/// <summary>
+		/// Creates a new connection to MSMQ.
+		/// </summary>
+		public IConnection CreateConnection(string userName, string password)
         {
             return new Connection();
         }
-                
-    }
+
+		/// <summary>
+		/// Creates a new connection to MSMQ.
+		/// </summary>
+		public IConnection CreateConnection(string userName, string password, bool useLogging)
+		{
+			return new Connection();
+		}
+	}
 }

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/DefaultMessageConverter.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/DefaultMessageConverter.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/DefaultMessageConverter.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/DefaultMessageConverter.cs Mon Oct 29 06:55:09 2007
@@ -17,9 +17,9 @@
 using System;
 using System.Text;
 using System.Messaging;
-using NMS;
+using Apache.NMS;
 
-namespace MSMQ
+namespace Apache.MSMQ
 {
     public class DefaultMessageConverter : IMessageConverter
 	{
@@ -34,7 +34,7 @@
             }
             //if (message.NMSExpiration != null)
             //{
-                answer.TimeToBeReceived = message.NMSExpiration;
+                answer.TimeToBeReceived = message.NMSTimeToLive;
             //}
             if (message.NMSCorrelationID != null)
             {
@@ -58,7 +58,7 @@
 			answer.NMSDestination = ToNmsDestination(message.DestinationQueue);
 			answer.NMSType = message.Label;
 			answer.NMSReplyTo = ToNmsDestination(message.ResponseQueue);
-			answer.NMSExpiration = message.TimeToBeReceived;
+			answer.NMSTimeToLive = message.TimeToBeReceived;
             return answer;
         }
 		

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/Destination.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/Destination.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/Destination.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/Destination.cs Mon Oct 29 06:55:09 2007
@@ -14,9 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using NMS;
+using Apache.NMS;
 using System;
-namespace MSMQ
+namespace Apache.MSMQ
 {
     
     /// <summary>

Modified: activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/IMessageConverter.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/IMessageConverter.cs?rev=589629&r1=589628&r2=589629&view=diff
==============================================================================
--- activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/IMessageConverter.cs (original)
+++ activemq/activemq-dotnet/trunk/src/main/csharp/MSMQ/IMessageConverter.cs Mon Oct 29 06:55:09 2007
@@ -15,9 +15,9 @@
  * limitations under the License.
  */
 using System.Messaging;
-using NMS;
+using Apache.NMS;
 
-namespace MSMQ
+namespace Apache.MSMQ
 {
     public interface IMessageConverter
     {



Mime
View raw message