activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r799407 [29/29] - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp: ./ Commands/ OpenWire/ OpenWire/V1/ OpenWire/V2/ OpenWire/V3/ OpenWire/V4/ OpenWire/V5/ State/ Transport/
Date Thu, 30 Jul 2009 19:06:44 GMT
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs?rev=799407&r1=799406&r2=799407&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs Thu Jul 30 19:06:34 2009
@@ -23,610 +23,608 @@
 
 namespace Apache.NMS.ActiveMQ
 {
-	/// <summary>
-	/// Default provider of ISession
-	/// </summary>
-	public class Session : ISession
-	{
-		/// <summary>
-		/// Private object used for synchronization, instead of public "this"
-		/// </summary>
-		private readonly object myLock = new object();
-		private int consumerCounter;
-		private readonly IDictionary consumers = Hashtable.Synchronized(new Hashtable());
-		private readonly IDictionary producers = Hashtable.Synchronized(new Hashtable());
-		private readonly DispatchingThread dispatchingThread;
-		private DispatchingThread.ExceptionHandler dispatchingThread_ExceptionHandler;
-		private readonly SessionInfo info;
-		private int producerCounter;
-		internal bool startedAsyncDelivery = false;
-		private bool disposed = false;
-		private bool closed = false;
-		private bool closing = false;
-		private TimeSpan MAX_THREAD_WAIT = TimeSpan.FromMilliseconds(30000);
-
-		public Session(Connection connection, SessionInfo info, AcknowledgementMode acknowledgementMode)
-		{
-			this.connection = connection;
-			this.info = info;
-			this.acknowledgementMode = acknowledgementMode;
-			this.AsyncSend = connection.AsyncSend;
-			this.requestTimeout = connection.RequestTimeout;
-			this.PrefetchSize = 1000;
-			this.transactionContext = new TransactionContext(this);
-			this.dispatchingThread = new DispatchingThread(new DispatchingThread.DispatchFunction(DispatchAsyncMessages));
-			this.dispatchingThread_ExceptionHandler = new DispatchingThread.ExceptionHandler(dispatchingThread_ExceptionListener);
-		}
-
-		~Session()
-		{
-			Dispose(false);
-		}
-
-		/// <summary>
-		/// Sets the prefetch size, the maximum number of messages a broker will dispatch to consumers
-		/// until acknowledgements are received.
-		/// </summary>
-		public int PrefetchSize;
-
-		/// <summary>
-		/// Sets the maximum number of messages to keep around per consumer
-		/// in addition to the prefetch window for non-durable topics until messages
-		/// will start to be evicted for slow consumers.
-		/// Must be > 0 to enable this feature
-		/// </summary>
-		public int MaximumPendingMessageLimit;
-
-		/// <summary>
-		/// Enables or disables whether asynchronous dispatch should be used by the broker
-		/// </summary>
-		public bool DispatchAsync;
-
-		/// <summary>
-		/// Enables or disables exclusive consumers when using queues. An exclusive consumer means
-		/// only one instance of a consumer is allowed to process messages on a queue to preserve order
-		/// </summary>
-		public bool Exclusive;
-
-		/// <summary>
-		/// Enables or disables retroactive mode for consumers; i.e. do they go back in time or not?
-		/// </summary>
-		public bool Retroactive;
-
-		/// <summary>
-		/// Sets the default consumer priority for consumers
-		/// </summary>
-		public byte Priority;
-
-		/// <summary>
-		/// This property indicates whether or not async send is enabled.
-		/// </summary>
-		public bool AsyncSend;
-
-		private Connection connection;
-		public Connection Connection
-		{
-			get { return this.connection; }
-		}
-
-		public SessionId SessionId
-		{
-			get { return info.SessionId; }
-		}
-
-		private TransactionContext transactionContext;
-		public TransactionContext TransactionContext
-		{
-			get { return this.transactionContext; }
-		}
-
-		#region ISession Members
-
-		public void Dispose()
-		{
-			Dispose(true);
-			GC.SuppressFinalize(this);
-		}
-
-		protected void Dispose(bool disposing)
-		{
-			if(this.disposed)
-			{
-				return;
-			}
-
-			if(disposing)
-			{
-				// Dispose managed code here.
-			}
-
-			try
-			{
-				Close();
-			}
-			catch
-			{
-				// Ignore network errors.
-			}
-
-			this.disposed = true;
-		}
-
-		public void Close()
-		{
-			lock(myLock)
-			{
-				if(this.closed)
-				{
-					return;
-				}
-
-				try
-				{
-					this.closing = true;
-					StopAsyncDelivery();
-					lock(consumers.SyncRoot)
-					{
-						foreach(MessageConsumer consumer in consumers.Values)
-						{
-							consumer.Close();
-						}
-					}
-					consumers.Clear();
-
-					lock(producers.SyncRoot)
-					{
-						foreach(MessageProducer producer in producers.Values)
-						{
-							producer.Close();
-						}
-					}
-					producers.Clear();
+    /// <summary>
+    /// Default provider of ISession
+    /// </summary>
+    public class Session : ISession
+    {
+        /// <summary>
+        /// Private object used for synchronization, instead of public "this"
+        /// </summary>
+        private readonly object myLock = new object();
+        private int consumerCounter;
+        private readonly IDictionary consumers = Hashtable.Synchronized(new Hashtable());
+        private readonly IDictionary producers = Hashtable.Synchronized(new Hashtable());
+        private readonly DispatchingThread dispatchingThread;
+        private DispatchingThread.ExceptionHandler dispatchingThread_ExceptionHandler;
+        private readonly SessionInfo info;
+        private int producerCounter;
+        internal bool startedAsyncDelivery = false;
+        private bool disposed = false;
+        private bool closed = false;
+        private bool closing = false;
+        private TimeSpan MAX_THREAD_WAIT = TimeSpan.FromMilliseconds(30000);
+
+        public Session(Connection connection, SessionInfo info, AcknowledgementMode acknowledgementMode)
+        {
+            this.connection = connection;
+            this.info = info;
+            this.acknowledgementMode = acknowledgementMode;
+            this.AsyncSend = connection.AsyncSend;
+            this.requestTimeout = connection.RequestTimeout;
+            this.PrefetchSize = 1000;
+            this.transactionContext = new TransactionContext(this);
+            this.dispatchingThread = new DispatchingThread(new DispatchingThread.DispatchFunction(DispatchAsyncMessages));
+            this.dispatchingThread_ExceptionHandler = new DispatchingThread.ExceptionHandler(dispatchingThread_ExceptionListener);
+        }
+
+        ~Session()
+        {
+            Dispose(false);
+        }
+
+        /// <summary>
+        /// Sets the prefetch size, the maximum number of messages a broker will dispatch to consumers
+        /// until acknowledgements are received.
+        /// </summary>
+        public int PrefetchSize;
+
+        /// <summary>
+        /// Sets the maximum number of messages to keep around per consumer
+        /// in addition to the prefetch window for non-durable topics until messages
+        /// will start to be evicted for slow consumers.
+        /// Must be > 0 to enable this feature
+        /// </summary>
+        public int MaximumPendingMessageLimit;
+
+        /// <summary>
+        /// Enables or disables whether asynchronous dispatch should be used by the broker
+        /// </summary>
+        public bool DispatchAsync;
+
+        /// <summary>
+        /// Enables or disables exclusive consumers when using queues. An exclusive consumer means
+        /// only one instance of a consumer is allowed to process messages on a queue to preserve order
+        /// </summary>
+        public bool Exclusive;
+
+        /// <summary>
+        /// Enables or disables retroactive mode for consumers; i.e. do they go back in time or not?
+        /// </summary>
+        public bool Retroactive;
+
+        /// <summary>
+        /// Sets the default consumer priority for consumers
+        /// </summary>
+        public byte Priority;
+
+        /// <summary>
+        /// This property indicates whether or not async send is enabled.
+        /// </summary>
+        public bool AsyncSend;
+
+        private Connection connection;
+        public Connection Connection
+        {
+            get { return this.connection; }
+        }
+
+        public SessionId SessionId
+        {
+            get { return info.SessionId; }
+        }
+
+        private TransactionContext transactionContext;
+        public TransactionContext TransactionContext
+        {
+            get { return this.transactionContext; }
+        }
+
+        #region ISession Members
+
+        public void Dispose()
+        {
+            Dispose(true);
+            GC.SuppressFinalize(this);
+        }
+
+        protected void Dispose(bool disposing)
+        {
+            if(this.disposed)
+            {
+                return;
+            }
+
+            if(disposing)
+            {
+                // Dispose managed code here.
+            }
+
+            try
+            {
+                Close();
+            }
+            catch
+            {
+                // Ignore network errors.
+            }
+
+            this.disposed = true;
+        }
+
+        public void Close()
+        {
+            lock(myLock)
+            {
+                if(this.closed)
+                {
+                    return;
+                }
+
+                try
+                {
+                    this.closing = true;
+                    StopAsyncDelivery();
+                    lock(consumers.SyncRoot)
+                    {
+                        foreach(MessageConsumer consumer in consumers.Values)
+                        {
+                            consumer.Close();
+                        }
+                    }
+                    consumers.Clear();
+
+                    lock(producers.SyncRoot)
+                    {
+                        foreach(MessageProducer producer in producers.Values)
+                        {
+                            producer.Close();
+                        }
+                    }
+                    producers.Clear();
                     Connection.RemoveSession(this);
-				}
-				catch(Exception ex)
-				{
-					Tracer.ErrorFormat("Error during session close: {0}", ex);
-				}
-				finally
-				{
-					this.connection = null;
-					this.closed = true;
-					this.closing = false;
-				}
-			}
-		}
-
-		public IMessageProducer CreateProducer()
-		{
-			return CreateProducer(null);
-		}
-
-		public IMessageProducer CreateProducer(IDestination destination)
-		{
-			ProducerInfo command = CreateProducerInfo(destination);
-			ProducerId producerId = command.ProducerId;
-			MessageProducer producer = null;
-
-			try
-			{
-				producer = new MessageProducer(this, command);
-				producers[producerId] = producer;
-				this.DoSend(command);
-			}
-			catch(Exception)
-			{
-				if(producer != null)
-				{
-					producer.Close();
-				}
-
-				throw;
-			}
-
-			return producer;
-		}
-
-		public IMessageConsumer CreateConsumer(IDestination destination)
-		{
-			return CreateConsumer(destination, null, false);
-		}
-
-		public IMessageConsumer CreateConsumer(IDestination destination, string selector)
-		{
-			return CreateConsumer(destination, selector, false);
-		}
+                }
+                catch(Exception ex)
+                {
+                    Tracer.ErrorFormat("Error during session close: {0}", ex);
+                }
+                finally
+                {
+                    this.connection = null;
+                    this.closed = true;
+                    this.closing = false;
+                }
+            }
+        }
+
+        public IMessageProducer CreateProducer()
+        {
+            return CreateProducer(null);
+        }
+
+        public IMessageProducer CreateProducer(IDestination destination)
+        {
+            ProducerInfo command = CreateProducerInfo(destination);
+            ProducerId producerId = command.ProducerId;
+            MessageProducer producer = null;
+
+            try
+            {
+                producer = new MessageProducer(this, command);
+                producers[producerId] = producer;
+                this.DoSend(command);
+            }
+            catch(Exception)
+            {
+                if(producer != null)
+                {
+                    producer.Close();
+                }
+
+                throw;
+            }
+
+            return producer;
+        }
+
+        public IMessageConsumer CreateConsumer(IDestination destination)
+        {
+            return CreateConsumer(destination, null, false);
+        }
+
+        public IMessageConsumer CreateConsumer(IDestination destination, string selector)
+        {
+            return CreateConsumer(destination, selector, false);
+        }
 
-		public IMessageConsumer CreateConsumer(IDestination destination, string selector, bool noLocal)
-		{
+        public IMessageConsumer CreateConsumer(IDestination destination, string selector, bool noLocal)
+        {
             if (destination == null)
             {
                 throw new InvalidDestinationException("Cannot create a Consumer with a Null destination");
             }
 
-			ConsumerInfo command = CreateConsumerInfo(destination, selector);
-			command.NoLocal = noLocal;
-			command.AcknowledgementMode = this.AcknowledgementMode;
-
-			ConsumerId consumerId = command.ConsumerId;
-			MessageConsumer consumer = null;
-
-			try
-			{
-				consumer = new MessageConsumer(this, command, this.AcknowledgementMode);
-				// lets register the consumer first in case we start dispatching messages immediately
-				consumers[consumerId] = consumer;
-				this.DoSend(command);
-				return consumer;
-			}
-			catch(Exception)
-			{
-				if(consumer != null)
-				{
-					consumer.Close();
-				}
-
-				throw;
-			}
-		}
+            ConsumerInfo command = CreateConsumerInfo(destination, selector);
+            command.NoLocal = noLocal;
+            ConsumerId consumerId = command.ConsumerId;
+            MessageConsumer consumer = null;
+
+            try
+            {
+                consumer = new MessageConsumer(this, command, this.AcknowledgementMode);
+                // lets register the consumer first in case we start dispatching messages immediately
+                consumers[consumerId] = consumer;
+                this.DoSend(command);
+                return consumer;
+            }
+            catch(Exception)
+            {
+                if(consumer != null)
+                {
+                    consumer.Close();
+                }
 
-		public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal)
-		{
+                throw;
+            }
+        }
+
+        public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal)
+        {
             if (destination == null)
             {
                 throw new InvalidDestinationException("Cannot create a Consumer with a Null destination");
             }
-            
+
             ConsumerInfo command = CreateConsumerInfo(destination, selector);
-			ConsumerId consumerId = command.ConsumerId;
-			command.SubscriptionName = name;
-			command.NoLocal = noLocal;
-			MessageConsumer consumer = null;
-
-			try
-			{
-				consumer = new MessageConsumer(this, command, this.AcknowledgementMode);
-				// lets register the consumer first in case we start dispatching messages immediately
-				consumers[consumerId] = consumer;
-				this.DoSend(command);
-			}
-			catch(Exception)
-			{
-				if(consumer != null)
-				{
-					consumer.Close();
-				}
-
-				throw;
-			}
-
-			return consumer;
-		}
-
-		public void DeleteDurableConsumer(string name)
-		{
-			RemoveSubscriptionInfo command = new RemoveSubscriptionInfo();
-			command.ConnectionId = Connection.ConnectionId;
-			command.ClientId = Connection.ClientId;
-			command.SubcriptionName = name;
-			this.DoSend(command);
-		}
-
-		public IQueue GetQueue(string name)
-		{
-			return new ActiveMQQueue(name);
-		}
-
-		public ITopic GetTopic(string name)
-		{
-			return new ActiveMQTopic(name);
-		}
-
-		public ITemporaryQueue CreateTemporaryQueue()
-		{
-			ActiveMQTempQueue answer = new ActiveMQTempQueue(Connection.CreateTemporaryDestinationName());
-			CreateTemporaryDestination(answer);
-			return answer;
-		}
-
-		public ITemporaryTopic CreateTemporaryTopic()
-		{
-			ActiveMQTempTopic answer = new ActiveMQTempTopic(Connection.CreateTemporaryDestinationName());
-			CreateTemporaryDestination(answer);
-			return answer;
-		}
-
-		/// <summary>
-		/// Delete a destination (Queue, Topic, Temp Queue, Temp Topic).
-		/// </summary>
-		public void DeleteDestination(IDestination destination)
-		{
-			DestinationInfo command = new DestinationInfo();
-			command.ConnectionId = Connection.ConnectionId;
-			command.OperationType = DestinationInfo.REMOVE_OPERATION_TYPE; // 1 is remove
-			command.Destination = destination;
-
-			this.DoSend(command);
-		}
-
-		public IMessage CreateMessage()
-		{
-			ActiveMQMessage answer = new ActiveMQMessage();
-			Configure(answer);
-			return answer;
-		}
-
-		public ITextMessage CreateTextMessage()
-		{
-			ActiveMQTextMessage answer = new ActiveMQTextMessage();
-			Configure(answer);
-			return answer;
-		}
-
-		public ITextMessage CreateTextMessage(string text)
-		{
-			ActiveMQTextMessage answer = new ActiveMQTextMessage(text);
-			Configure(answer);
-			return answer;
-		}
-
-		public IMapMessage CreateMapMessage()
-		{
-			return new ActiveMQMapMessage();
-		}
-
-		public IBytesMessage CreateBytesMessage()
-		{
-			return new ActiveMQBytesMessage();
-		}
-
-		public IBytesMessage CreateBytesMessage(byte[] body)
-		{
-			ActiveMQBytesMessage answer = new ActiveMQBytesMessage();
-			answer.Content = body;
-			return answer;
-		}
-
-		public IObjectMessage CreateObjectMessage(object body)
-		{
-			ActiveMQObjectMessage answer = new ActiveMQObjectMessage();
-			answer.Body = body;
-			return answer;
-		}
-
-		public void Commit()
-		{
-			if(!Transacted)
-			{
-				throw new InvalidOperationException(
-						"You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: "
-						+ this.AcknowledgementMode);
-			}
-			this.TransactionContext.Commit();
-		}
-
-		public void Rollback()
-		{
-			if(!Transacted)
-			{
-				throw new InvalidOperationException(
-						"You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: "
-						+ this.AcknowledgementMode);
-			}
-			this.TransactionContext.Rollback();
-
-			// lets ensure all the consumers redeliver any rolled back messages
-			lock(consumers.SyncRoot)
-			{
-				foreach(MessageConsumer consumer in consumers.Values)
-				{
-					consumer.RedeliverRolledBackMessages();
-				}
-			}
-		}
-
-
-		// Properties
-
-		private TimeSpan requestTimeout = Apache.NMS.NMSConstants.defaultRequestTimeout;
-		public TimeSpan RequestTimeout
-		{
-			get { return this.requestTimeout; }
-			set { this.requestTimeout = value; }
-		}
-
-		public bool Transacted
-		{
-			get { return this.AcknowledgementMode == AcknowledgementMode.Transactional; }
-		}
-
-		private AcknowledgementMode acknowledgementMode;
-		public AcknowledgementMode AcknowledgementMode
-		{
-			get { return this.acknowledgementMode; }
-		}
-
-		#endregion
-
-		private void dispatchingThread_ExceptionListener(Exception exception)
-		{
-			if(null != Connection)
-			{
-				try
-				{
-					Connection.OnSessionException(this, exception);
-				}
-				catch
-				{
-				}
-			}
-		}
-
-		protected void CreateTemporaryDestination(ActiveMQDestination tempDestination)
-		{
-			DestinationInfo command = new DestinationInfo();
-			command.ConnectionId = Connection.ConnectionId;
-			command.OperationType = DestinationInfo.ADD_OPERATION_TYPE; // 0 is add
-			command.Destination = tempDestination;
-
-			this.DoSend(command);
-		}
-
-		public void DoSend(Command message)
-		{
-			this.DoSend(message, this.RequestTimeout);
-		}
-
-		public void DoSend(Command message, TimeSpan requestTimeout)
-		{
-			if(AsyncSend)
-			{
-				Connection.OneWay(message);
-			}
-			else
-			{
-				Connection.SyncRequest(message, requestTimeout);
-			}
-		}
-
-		/// <summary>
-		/// Ensures that a transaction is started
-		/// </summary>
-		public void DoStartTransaction()
-		{
-			if(Transacted)
-			{
-				this.TransactionContext.Begin();
-			}
-		}
-
-		public void DisposeOf(ConsumerId objectId)
-		{
-			Connection.DisposeOf(objectId);
-			if(!this.closing)
-			{
-				consumers.Remove(objectId);
-			}
-		}
-
-		public void DisposeOf(ProducerId objectId)
-		{
-			Connection.DisposeOf(objectId);
-			if(!this.closing)
-			{
-				producers.Remove(objectId);
-			}
-		}
-
-		public bool DispatchMessage(ConsumerId consumerId, Message message)
-		{
-			bool dispatched = false;
-			MessageConsumer consumer = (MessageConsumer) consumers[consumerId];
-
-			if(consumer != null)
-			{
-				consumer.Dispatch((ActiveMQMessage) message);
-				dispatched = true;
-			}
-
-			return dispatched;
-		}
-
-		/// <summary>
-		/// Private method called by the dispatcher thread in order to perform
-		/// asynchronous delivery of queued (inbound) messages.
-		/// </summary>
-		private void DispatchAsyncMessages()
-		{
-			// lets iterate through each consumer created by this session
-			// ensuring that they have all pending messages dispatched
-			lock(consumers.SyncRoot)
-			{
-				foreach(MessageConsumer consumer in consumers.Values)
-				{
-					consumer.DispatchAsyncMessages();
-				}
-			}
-		}
-
-		protected virtual ConsumerInfo CreateConsumerInfo(IDestination destination, string selector)
-		{
-			ConsumerInfo answer = new ConsumerInfo();
-			ConsumerId id = new ConsumerId();
-			id.ConnectionId = info.SessionId.ConnectionId;
-			id.SessionId = info.SessionId.Value;
-			id.Value = Interlocked.Increment(ref consumerCounter);
-			answer.ConsumerId = id;
-			answer.Destination = ActiveMQDestination.Transform(destination);
-			answer.Selector = selector;
-			answer.PrefetchSize = this.PrefetchSize;
-			answer.Priority = this.Priority;
-			answer.Exclusive = this.Exclusive;
-			answer.DispatchAsync = this.DispatchAsync;
-			answer.Retroactive = this.Retroactive;
-
-			// If the destination contained a URI query, then use it to set public properties
-			// on the ConsumerInfo
-			ActiveMQDestination amqDestination = destination as ActiveMQDestination;
-			if(amqDestination != null && amqDestination.Options != null)
-			{
-				URISupport.SetProperties(answer, amqDestination.Options, "consumer.");
-			}
-
-			return answer;
-		}
-
-		protected virtual ProducerInfo CreateProducerInfo(IDestination destination)
-		{
-			ProducerInfo answer = new ProducerInfo();
-			ProducerId id = new ProducerId();
-			id.ConnectionId = info.SessionId.ConnectionId;
-			id.SessionId = info.SessionId.Value;
-			id.Value = Interlocked.Increment(ref producerCounter);
-			answer.ProducerId = id;
-			answer.Destination = ActiveMQDestination.Transform(destination);
-
-			// If the destination contained a URI query, then use it to set public
-			// properties on the ProducerInfo
-			ActiveMQDestination amqDestination = destination as ActiveMQDestination;
-			if(amqDestination != null && amqDestination.Options != null)
-			{
-				URISupport.SetProperties(answer, amqDestination.Options, "producer.");
-			}
-
-			return answer;
-		}
-
-		/// <summary>
-		/// Configures the message command
-		/// </summary>
-		protected void Configure(ActiveMQMessage message)
-		{
-		}
-
-		internal void StopAsyncDelivery()
-		{
-			if(startedAsyncDelivery)
-			{
-				this.dispatchingThread.ExceptionListener -= this.dispatchingThread_ExceptionHandler;
-				dispatchingThread.Stop((int) MAX_THREAD_WAIT.TotalMilliseconds);
-				startedAsyncDelivery = false;
-			}
-		}
-
-		internal void StartAsyncDelivery()
-		{
-			if(!startedAsyncDelivery)
-			{
-				this.dispatchingThread.ExceptionListener += this.dispatchingThread_ExceptionHandler;
-				dispatchingThread.Start();
-				startedAsyncDelivery = true;
-			}
-		}
-
-		internal void RegisterConsumerDispatcher(Dispatcher dispatcher)
-		{
-			dispatcher.SetAsyncDelivery(this.dispatchingThread.EventHandle);
-		}
-	}
+            ConsumerId consumerId = command.ConsumerId;
+            command.SubscriptionName = name;
+            command.NoLocal = noLocal;
+            MessageConsumer consumer = null;
+
+            try
+            {
+                consumer = new MessageConsumer(this, command, this.AcknowledgementMode);
+                // lets register the consumer first in case we start dispatching messages immediately
+                consumers[consumerId] = consumer;
+                this.DoSend(command);
+            }
+            catch(Exception)
+            {
+                if(consumer != null)
+                {
+                    consumer.Close();
+                }
+
+                throw;
+            }
+
+            return consumer;
+        }
+
+        public void DeleteDurableConsumer(string name)
+        {
+            RemoveSubscriptionInfo command = new RemoveSubscriptionInfo();
+            command.ConnectionId = Connection.ConnectionId;
+            command.ClientId = Connection.ClientId;
+            command.SubcriptionName = name;
+            this.DoSend(command);
+        }
+
+        public IQueue GetQueue(string name)
+        {
+            return new ActiveMQQueue(name);
+        }
+
+        public ITopic GetTopic(string name)
+        {
+            return new ActiveMQTopic(name);
+        }
+
+        public ITemporaryQueue CreateTemporaryQueue()
+        {
+            ActiveMQTempQueue answer = new ActiveMQTempQueue(Connection.CreateTemporaryDestinationName());
+            CreateTemporaryDestination(answer);
+            return answer;
+        }
+
+        public ITemporaryTopic CreateTemporaryTopic()
+        {
+            ActiveMQTempTopic answer = new ActiveMQTempTopic(Connection.CreateTemporaryDestinationName());
+            CreateTemporaryDestination(answer);
+            return answer;
+        }
+
+        /// <summary>
+        /// Delete a destination (Queue, Topic, Temp Queue, Temp Topic).
+        /// </summary>
+        public void DeleteDestination(IDestination destination)
+        {
+            DestinationInfo command = new DestinationInfo();
+            command.ConnectionId = Connection.ConnectionId;
+            command.OperationType = DestinationInfo.REMOVE_OPERATION_TYPE; // 1 is remove
+            command.Destination = (ActiveMQDestination) destination;
+
+            this.DoSend(command);
+        }
+
+        public IMessage CreateMessage()
+        {
+            ActiveMQMessage answer = new ActiveMQMessage();
+            Configure(answer);
+            return answer;
+        }
+
+        public ITextMessage CreateTextMessage()
+        {
+            ActiveMQTextMessage answer = new ActiveMQTextMessage();
+            Configure(answer);
+            return answer;
+        }
+
+        public ITextMessage CreateTextMessage(string text)
+        {
+            ActiveMQTextMessage answer = new ActiveMQTextMessage(text);
+            Configure(answer);
+            return answer;
+        }
+
+        public IMapMessage CreateMapMessage()
+        {
+            return new ActiveMQMapMessage();
+        }
+
+        public IBytesMessage CreateBytesMessage()
+        {
+            return new ActiveMQBytesMessage();
+        }
+
+        public IBytesMessage CreateBytesMessage(byte[] body)
+        {
+            ActiveMQBytesMessage answer = new ActiveMQBytesMessage();
+            answer.Content = body;
+            return answer;
+        }
+
+        public IObjectMessage CreateObjectMessage(object body)
+        {
+            ActiveMQObjectMessage answer = new ActiveMQObjectMessage();
+            answer.Body = body;
+            return answer;
+        }
+
+        public void Commit()
+        {
+            if(!Transacted)
+            {
+                throw new InvalidOperationException(
+                        "You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: "
+                        + this.AcknowledgementMode);
+            }
+            this.TransactionContext.Commit();
+        }
+
+        public void Rollback()
+        {
+            if(!Transacted)
+            {
+                throw new InvalidOperationException(
+                        "You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: "
+                        + this.AcknowledgementMode);
+            }
+            this.TransactionContext.Rollback();
+
+            // lets ensure all the consumers redeliver any rolled back messages
+            lock(consumers.SyncRoot)
+            {
+                foreach(MessageConsumer consumer in consumers.Values)
+                {
+                    consumer.RedeliverRolledBackMessages();
+                }
+            }
+        }
+
+
+        // Properties
+
+        private TimeSpan requestTimeout = Apache.NMS.NMSConstants.defaultRequestTimeout;
+        public TimeSpan RequestTimeout
+        {
+            get { return this.requestTimeout; }
+            set { this.requestTimeout = value; }
+        }
+
+        public bool Transacted
+        {
+            get { return this.AcknowledgementMode == AcknowledgementMode.Transactional; }
+        }
+
+        private AcknowledgementMode acknowledgementMode;
+        public AcknowledgementMode AcknowledgementMode
+        {
+            get { return this.acknowledgementMode; }
+        }
+
+        #endregion
+
+        private void dispatchingThread_ExceptionListener(Exception exception)
+        {
+            if(null != Connection)
+            {
+                try
+                {
+                    Connection.OnSessionException(this, exception);
+                }
+                catch
+                {
+                }
+            }
+        }
+
+        protected void CreateTemporaryDestination(ActiveMQDestination tempDestination)
+        {
+            DestinationInfo command = new DestinationInfo();
+            command.ConnectionId = Connection.ConnectionId;
+            command.OperationType = DestinationInfo.ADD_OPERATION_TYPE; // 0 is add
+            command.Destination = tempDestination;
+
+            this.DoSend(command);
+        }
+
+        public void DoSend(Command message)
+        {
+            this.DoSend(message, this.RequestTimeout);
+        }
+
+        public void DoSend(Command message, TimeSpan requestTimeout)
+        {
+            if(AsyncSend)
+            {
+                Connection.OneWay(message);
+            }
+            else
+            {
+                Connection.SyncRequest(message, requestTimeout);
+            }
+        }
+
+        /// <summary>
+        /// Ensures that a transaction is started
+        /// </summary>
+        public void DoStartTransaction()
+        {
+            if(Transacted)
+            {
+                this.TransactionContext.Begin();
+            }
+        }
+
+        public void DisposeOf(ConsumerId objectId)
+        {
+            Connection.DisposeOf(objectId);
+            if(!this.closing)
+            {
+                consumers.Remove(objectId);
+            }
+        }
+
+        public void DisposeOf(ProducerId objectId)
+        {
+            Connection.DisposeOf(objectId);
+            if(!this.closing)
+            {
+                producers.Remove(objectId);
+            }
+        }
+
+        public bool DispatchMessage(ConsumerId consumerId, Message message)
+        {
+            bool dispatched = false;
+            MessageConsumer consumer = (MessageConsumer) consumers[consumerId];
+
+            if(consumer != null)
+            {
+                consumer.Dispatch((ActiveMQMessage) message);
+                dispatched = true;
+            }
+
+            return dispatched;
+        }
+
+        /// <summary>
+        /// Private method called by the dispatcher thread in order to perform
+        /// asynchronous delivery of queued (inbound) messages.
+        /// </summary>
+        private void DispatchAsyncMessages()
+        {
+            // lets iterate through each consumer created by this session
+            // ensuring that they have all pending messages dispatched
+            lock(consumers.SyncRoot)
+            {
+                foreach(MessageConsumer consumer in consumers.Values)
+                {
+                    consumer.DispatchAsyncMessages();
+                }
+            }
+        }
+
+        protected virtual ConsumerInfo CreateConsumerInfo(IDestination destination, string selector)
+        {
+            ConsumerInfo answer = new ConsumerInfo();
+            ConsumerId id = new ConsumerId();
+            id.ConnectionId = info.SessionId.ConnectionId;
+            id.SessionId = info.SessionId.Value;
+            id.Value = Interlocked.Increment(ref consumerCounter);
+            answer.ConsumerId = id;
+            answer.Destination = ActiveMQDestination.Transform(destination);
+            answer.Selector = selector;
+            answer.PrefetchSize = this.PrefetchSize;
+            answer.Priority = this.Priority;
+            answer.Exclusive = this.Exclusive;
+            answer.DispatchAsync = this.DispatchAsync;
+            answer.Retroactive = this.Retroactive;
+
+            // If the destination contained a URI query, then use it to set public properties
+            // on the ConsumerInfo
+            ActiveMQDestination amqDestination = destination as ActiveMQDestination;
+            if(amqDestination != null && amqDestination.Options != null)
+            {
+                URISupport.SetProperties(answer, amqDestination.Options, "consumer.");
+            }
+
+            return answer;
+        }
+
+        protected virtual ProducerInfo CreateProducerInfo(IDestination destination)
+        {
+            ProducerInfo answer = new ProducerInfo();
+            ProducerId id = new ProducerId();
+            id.ConnectionId = info.SessionId.ConnectionId;
+            id.SessionId = info.SessionId.Value;
+            id.Value = Interlocked.Increment(ref producerCounter);
+            answer.ProducerId = id;
+            answer.Destination = ActiveMQDestination.Transform(destination);
+
+            // If the destination contained a URI query, then use it to set public
+            // properties on the ProducerInfo
+            ActiveMQDestination amqDestination = destination as ActiveMQDestination;
+            if(amqDestination != null && amqDestination.Options != null)
+            {
+                URISupport.SetProperties(answer, amqDestination.Options, "producer.");
+            }
+
+            return answer;
+        }
+
+        /// <summary>
+        /// Configures the message command
+        /// </summary>
+        protected void Configure(ActiveMQMessage message)
+        {
+        }
+
+        internal void StopAsyncDelivery()
+        {
+            if(startedAsyncDelivery)
+            {
+                this.dispatchingThread.ExceptionListener -= this.dispatchingThread_ExceptionHandler;
+                dispatchingThread.Stop((int) MAX_THREAD_WAIT.TotalMilliseconds);
+                startedAsyncDelivery = false;
+            }
+        }
+
+        internal void StartAsyncDelivery()
+        {
+            if(!startedAsyncDelivery)
+            {
+                this.dispatchingThread.ExceptionListener += this.dispatchingThread_ExceptionHandler;
+                dispatchingThread.Start();
+                startedAsyncDelivery = true;
+            }
+        }
+
+        internal void RegisterConsumerDispatcher(Dispatcher dispatcher)
+        {
+            dispatcher.SetAsyncDelivery(this.dispatchingThread.EventHandle);
+        }
+    }
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/CommandVisitorAdapter.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/CommandVisitorAdapter.cs?rev=799407&r1=799406&r2=799407&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/CommandVisitorAdapter.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/CommandVisitorAdapter.cs Thu Jul 30 19:06:34 2009
@@ -19,178 +19,188 @@
 
 namespace Apache.NMS.ActiveMQ.State
 {
-	public class CommandVisitorAdapter : ICommandVisitor
-	{
+    public class CommandVisitorAdapter : ICommandVisitor
+    {
 
-		public virtual Response processAddConnection(ConnectionInfo info)
-		{
-			return null;
-		}
-
-		public virtual Response processAddConsumer(ConsumerInfo info)
-		{
-			return null;
-		}
-
-		public virtual Response processAddDestination(DestinationInfo info)
-		{
-			return null;
-		}
-
-		public virtual Response processAddProducer(ProducerInfo info)
-		{
-			return null;
-		}
-
-		public virtual Response processAddSession(SessionInfo info)
-		{
-			return null;
-		}
-
-		public virtual Response processBeginTransaction(TransactionInfo info)
-		{
-			return null;
-		}
-
-		public virtual Response processBrokerInfo(BrokerInfo info)
-		{
-			return null;
-		}
-
-		public virtual Response processCommitTransactionOnePhase(TransactionInfo info)
-		{
-			return null;
-		}
-
-		public virtual Response processCommitTransactionTwoPhase(TransactionInfo info)
-		{
-			return null;
-		}
-
-		public virtual Response processEndTransaction(TransactionInfo info)
-		{
-			return null;
-		}
-
-		public virtual Response processFlush(FlushCommand command)
-		{
-			return null;
-		}
-
-		public virtual Response processForgetTransaction(TransactionInfo info)
-		{
-			return null;
-		}
-
-		public virtual Response processKeepAlive(KeepAliveInfo info)
-		{
-			return null;
-		}
-
-		public virtual Response processMessage(Message send)
-		{
-			return null;
-		}
-
-		public virtual Response processMessageAck(MessageAck ack)
-		{
-			return null;
-		}
-
-		public virtual Response processMessageDispatchNotification(MessageDispatchNotification notification)
-		{
-			return null;
-		}
-
-		public virtual Response processMessagePull(MessagePull pull)
-		{
-			return null;
-		}
-
-		public virtual Response processPrepareTransaction(TransactionInfo info)
-		{
-			return null;
-		}
-
-		public virtual Response processProducerAck(ProducerAck ack)
-		{
-			return null;
-		}
-
-		public virtual Response processRecoverTransactions(TransactionInfo info)
-		{
-			return null;
-		}
-
-		public virtual Response processRemoveConnection(ConnectionId id)
-		{
-			return null;
-		}
-
-		public virtual Response processRemoveConsumer(ConsumerId id)
-		{
-			return null;
-		}
-
-		public virtual Response processRemoveDestination(DestinationInfo info)
-		{
-			return null;
-		}
-
-		public virtual Response processRemoveProducer(ProducerId id)
-		{
-			return null;
-		}
-
-		public virtual Response processRemoveSession(SessionId id)
-		{
-			return null;
-		}
-
-		public virtual Response processRemoveSubscription(RemoveSubscriptionInfo info)
-		{
-			return null;
-		}
-
-		public virtual Response processRollbackTransaction(TransactionInfo info)
-		{
-			return null;
-		}
-
-		public virtual Response processShutdown(ShutdownInfo info)
-		{
-			return null;
-		}
-
-		public virtual Response processWireFormat(WireFormatInfo info)
-		{
-			return null;
-		}
-
-		public virtual Response processMessageDispatch(MessageDispatch dispatch)
-		{
-			return null;
-		}
-
-		public virtual Response processControlCommand(ControlCommand command)
-		{
-			return null;
-		}
-
-		public virtual Response processConnectionControl(ConnectionControl control)
-		{
-			return null;
-		}
-
-		public virtual Response processConnectionError(ConnectionError error)
-		{
-			return null;
-		}
-
-		public virtual Response processConsumerControl(ConsumerControl control)
-		{
-			return null;
-		}
+        public virtual Response processAddConnection(ConnectionInfo info)
+        {
+            return null;
+        }
+
+        public virtual Response processAddConsumer(ConsumerInfo info)
+        {
+            return null;
+        }
+
+        public virtual Response processAddDestination(DestinationInfo info)
+        {
+            return null;
+        }
+
+        public virtual Response processAddProducer(ProducerInfo info)
+        {
+            return null;
+        }
+
+        public virtual Response processAddSession(SessionInfo info)
+        {
+            return null;
+        }
+
+        public virtual Response processBeginTransaction(TransactionInfo info)
+        {
+            return null;
+        }
+
+        public virtual Response processBrokerInfo(BrokerInfo info)
+        {
+            return null;
+        }
+
+        public virtual Response processCommitTransactionOnePhase(TransactionInfo info)
+        {
+            return null;
+        }
+
+        public virtual Response processCommitTransactionTwoPhase(TransactionInfo info)
+        {
+            return null;
+        }
+
+        public virtual Response processEndTransaction(TransactionInfo info)
+        {
+            return null;
+        }
+
+        public virtual Response processFlushCommand(FlushCommand command)
+        {
+            return null;
+        }
+
+        public virtual Response processForgetTransaction(TransactionInfo info)
+        {
+            return null;
+        }
+
+        public virtual Response processKeepAliveInfo(KeepAliveInfo info)
+        {
+            return null;
+        }
+
+        public virtual Response processMessage(Message send)
+        {
+            return null;
+        }
+
+        public virtual Response processMessageAck(MessageAck ack)
+        {
+            return null;
+        }
+
+        public virtual Response processMessageDispatchNotification(MessageDispatchNotification notification)
+        {
+            return null;
+        }
+
+        public virtual Response processMessagePull(MessagePull pull)
+        {
+            return null;
+        }
+
+        public virtual Response processPrepareTransaction(TransactionInfo info)
+        {
+            return null;
+        }
+
+        public virtual Response processProducerAck(ProducerAck ack)
+        {
+            return null;
+        }
+
+        public virtual Response processRecoverTransactions(TransactionInfo info)
+        {
+            return null;
+        }
+
+        public virtual Response processRemoveConnection(ConnectionId id)
+        {
+            return null;
+        }
+
+        public virtual Response processRemoveConsumer(ConsumerId id)
+        {
+            return null;
+        }
+
+        public virtual Response processRemoveDestination(DestinationInfo info)
+        {
+            return null;
+        }
+
+        public virtual Response processRemoveProducer(ProducerId id)
+        {
+            return null;
+        }
+
+        public virtual Response processRemoveSession(SessionId id)
+        {
+            return null;
+        }
+
+        public virtual Response processRemoveSubscriptionInfo(RemoveSubscriptionInfo info)
+        {
+            return null;
+        }
+
+        public virtual Response processRollbackTransaction(TransactionInfo info)
+        {
+            return null;
+        }
+
+        public virtual Response processShutdownInfo(ShutdownInfo info)
+        {
+            return null;
+        }
+
+        public virtual Response processWireFormat(WireFormatInfo info)
+        {
+            return null;
+        }
+
+        public virtual Response processMessageDispatch(MessageDispatch dispatch)
+        {
+            return null;
+        }
+
+        public virtual Response processControlCommand(ControlCommand command)
+        {
+            return null;
+        }
+
+        public virtual Response processConnectionControl(ConnectionControl control)
+        {
+            return null;
+        }
+
+        public virtual Response processConnectionError(ConnectionError error)
+        {
+            return null;
+        }
+
+        public virtual Response processConsumerControl(ConsumerControl control)
+        {
+            return null;
+        }
+
+        public virtual Response processResponse(Response response)
+        {
+            return null;
+        }
+
+        public virtual Response processReplayCommand(ReplayCommand replayCommand)
+        {
+           return null;
+        }
 
-	}
+    }
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ICommandVisitor.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ICommandVisitor.cs?rev=799407&r1=799406&r2=799407&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ICommandVisitor.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ICommandVisitor.cs Thu Jul 30 19:06:34 2009
@@ -20,76 +20,80 @@
 
 namespace Apache.NMS.ActiveMQ.State
 {
-	public interface ICommandVisitor
-	{
+    public interface ICommandVisitor
+    {
 
-		Response processAddConnection(ConnectionInfo info);
+        Response processAddConnection(ConnectionInfo info);
 
-		Response processAddSession(SessionInfo info);
+        Response processAddSession(SessionInfo info);
 
-		Response processAddProducer(ProducerInfo info);
+        Response processAddProducer(ProducerInfo info);
 
-		Response processAddConsumer(ConsumerInfo info);
+        Response processAddConsumer(ConsumerInfo info);
 
-		Response processRemoveConnection(ConnectionId id);
+        Response processRemoveConnection(ConnectionId id);
 
-		Response processRemoveSession(SessionId id);
+        Response processRemoveSession(SessionId id);
 
-		Response processRemoveProducer(ProducerId id);
+        Response processRemoveProducer(ProducerId id);
 
-		Response processRemoveConsumer(ConsumerId id);
+        Response processRemoveConsumer(ConsumerId id);
 
-		Response processAddDestination(DestinationInfo info);
+        Response processAddDestination(DestinationInfo info);
 
-		Response processRemoveDestination(DestinationInfo info);
+        Response processRemoveDestination(DestinationInfo info);
 
-		Response processRemoveSubscription(RemoveSubscriptionInfo info);
+        Response processRemoveSubscriptionInfo(RemoveSubscriptionInfo info);
 
-		Response processMessage(Message send);
+        Response processMessage(Message send);
 
-		Response processMessageAck(MessageAck ack);
+        Response processMessageAck(MessageAck ack);
 
-		Response processMessagePull(MessagePull pull);
+        Response processMessagePull(MessagePull pull);
 
-		Response processBeginTransaction(TransactionInfo info);
+        Response processBeginTransaction(TransactionInfo info);
 
-		Response processPrepareTransaction(TransactionInfo info);
+        Response processPrepareTransaction(TransactionInfo info);
 
-		Response processCommitTransactionOnePhase(TransactionInfo info);
+        Response processCommitTransactionOnePhase(TransactionInfo info);
 
-		Response processCommitTransactionTwoPhase(TransactionInfo info);
+        Response processCommitTransactionTwoPhase(TransactionInfo info);
 
-		Response processRollbackTransaction(TransactionInfo info);
+        Response processRollbackTransaction(TransactionInfo info);
 
-		Response processWireFormat(WireFormatInfo info);
+        Response processWireFormat(WireFormatInfo info);
 
-		Response processKeepAlive(KeepAliveInfo info);
+        Response processKeepAliveInfo(KeepAliveInfo info);
 
-		Response processShutdown(ShutdownInfo info);
+        Response processShutdownInfo(ShutdownInfo info);
 
-		Response processFlush(FlushCommand command);
+        Response processFlushCommand(FlushCommand command);
 
-		Response processBrokerInfo(BrokerInfo info);
+        Response processBrokerInfo(BrokerInfo info);
 
-		Response processRecoverTransactions(TransactionInfo info);
+        Response processRecoverTransactions(TransactionInfo info);
 
-		Response processForgetTransaction(TransactionInfo info);
+        Response processForgetTransaction(TransactionInfo info);
 
-		Response processEndTransaction(TransactionInfo info);
+        Response processEndTransaction(TransactionInfo info);
 
-		Response processMessageDispatchNotification(MessageDispatchNotification notification);
+        Response processMessageDispatchNotification(MessageDispatchNotification notification);
 
-		Response processProducerAck(ProducerAck ack);
+        Response processProducerAck(ProducerAck ack);
 
-		Response processMessageDispatch(MessageDispatch dispatch);
+        Response processMessageDispatch(MessageDispatch dispatch);
 
-		Response processControlCommand(ControlCommand command);
+        Response processControlCommand(ControlCommand command);
 
-		Response processConnectionError(ConnectionError error);
+        Response processConnectionError(ConnectionError error);
 
-		Response processConnectionControl(ConnectionControl control);
+        Response processConnectionControl(ConnectionControl control);
 
-		Response processConsumerControl(ConsumerControl control);
+        Response processConsumerControl(ConsumerControl control);
 
-	}
+        Response processResponse(Response response);
+
+        Response processReplayCommand(ReplayCommand replayCommand);
+
+    }
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/WireFormatNegotiator.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/WireFormatNegotiator.cs?rev=799407&r1=799406&r2=799407&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/WireFormatNegotiator.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/WireFormatNegotiator.cs Thu Jul 30 19:06:34 2009
@@ -24,84 +24,84 @@
 
 namespace Apache.NMS.ActiveMQ.Transport
 {
-	/// <summary>
-	/// A Transport which negotiates the wire format
-	/// </summary>
-	public class WireFormatNegotiator : TransportFilter
-	{
-		private OpenWireFormat wireFormat;
-		private TimeSpan negotiateTimeout = TimeSpan.FromSeconds(15);
-
-		private AtomicBoolean firstStart=new AtomicBoolean(true);
-		private CountDownLatch readyCountDownLatch = new CountDownLatch(1);
-		private CountDownLatch wireInfoSentDownLatch = new CountDownLatch(1);
-
-		public WireFormatNegotiator(ITransport next, OpenWireFormat wireFormat)
-			: base(next)
-		{
-			this.wireFormat = wireFormat;
-		}
-
-		public override void Start()
-		{
-			base.Start();
-			if (firstStart.CompareAndSet(true, false))
-			{
-				try
-				{
-					next.Oneway(wireFormat.PreferedWireFormatInfo);
-				}
-				finally
-				{
-					wireInfoSentDownLatch.countDown();
-				}
-			}
-		}
-
-		protected override void Dispose(bool disposing)
-		{
-			base.Dispose(disposing);
-			readyCountDownLatch.countDown();
-		}
-
-		public override void Oneway(Command command)
-		{
-			if (!readyCountDownLatch.await(negotiateTimeout))
-				throw new IOException("Wire format negotiation timeout: peer did not send his wire format.");
-			next.Oneway(command);
-		}
-
-		protected override void OnCommand(ITransport sender, Command command)
-		{
-			if ( command.GetDataStructureType() == WireFormatInfo.ID_WireFormatInfo )
-			{
-				WireFormatInfo info = (WireFormatInfo)command;
-				try
-				{
-					if (!info.Valid)
-					{
-						throw new IOException("Remote wire format magic is invalid");
-					}
-					wireInfoSentDownLatch.await(negotiateTimeout);
-					wireFormat.renegotiateWireFormat(info);
-				}
-				catch (Exception e)
-				{
-					OnException(this, e);
-				}
-				finally
-				{
-					readyCountDownLatch.countDown();
-				}
-			}
-			this.commandHandler(sender, command);
-		}
-
-		protected override void OnException(ITransport sender, Exception command)
-		{
-			readyCountDownLatch.countDown();
-			this.exceptionHandler(sender, command);
-		}
-	}
+    /// <summary>
+    /// A Transport which negotiates the wire format
+    /// </summary>
+    public class WireFormatNegotiator : TransportFilter
+    {
+        private OpenWireFormat wireFormat;
+        private TimeSpan negotiateTimeout = TimeSpan.FromSeconds(15);
+
+        private AtomicBoolean firstStart=new AtomicBoolean(true);
+        private CountDownLatch readyCountDownLatch = new CountDownLatch(1);
+        private CountDownLatch wireInfoSentDownLatch = new CountDownLatch(1);
+
+        public WireFormatNegotiator(ITransport next, OpenWireFormat wireFormat)
+            : base(next)
+        {
+            this.wireFormat = wireFormat;
+        }
+
+        public override void Start()
+        {
+            base.Start();
+            if (firstStart.CompareAndSet(true, false))
+            {
+                try
+                {
+                    next.Oneway(wireFormat.PreferedWireFormatInfo);
+                }
+                finally
+                {
+                    wireInfoSentDownLatch.countDown();
+                }
+            }
+        }
+
+        protected override void Dispose(bool disposing)
+        {
+            base.Dispose(disposing);
+            readyCountDownLatch.countDown();
+        }
+
+        public override void Oneway(Command command)
+        {
+            if (!readyCountDownLatch.await(negotiateTimeout))
+                throw new IOException("Wire format negotiation timeout: peer did not send his wire format.");
+            next.Oneway(command);
+        }
+
+        protected override void OnCommand(ITransport sender, Command command)
+        {
+            if ( command.IsWireFormatInfo )
+            {
+                WireFormatInfo info = (WireFormatInfo)command;
+                try
+                {
+                    if (!info.Valid)
+                    {
+                        throw new IOException("Remote wire format magic is invalid");
+                    }
+                    wireInfoSentDownLatch.await(negotiateTimeout);
+                    wireFormat.renegotiateWireFormat(info);
+                }
+                catch (Exception e)
+                {
+                    OnException(this, e);
+                }
+                finally
+                {
+                    readyCountDownLatch.countDown();
+                }
+            }
+            this.commandHandler(sender, command);
+        }
+
+        protected override void OnException(ITransport sender, Exception command)
+        {
+            readyCountDownLatch.countDown();
+            this.exceptionHandler(sender, command);
+        }
+    }
 }
 



Mime
View raw message