activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jgo...@apache.org
Subject svn commit: r726083 [1/2] - in /activemq/activemq-dotnet/Apache.NMS.WCF/trunk: ./ src/main/csharp/ src/main/csharp/Configuration/
Date Fri, 12 Dec 2008 18:25:52 GMT
Author: jgomes
Date: Fri Dec 12 10:25:52 2008
New Revision: 726083

URL: http://svn.apache.org/viewvc?rev=726083&view=rev
Log:
Applying patch file from David that adds support for Sessions in the WCF component.  Thanks, David!
Fixes [AMQNET-127]. (See https://issues.apache.org/activemq/browse/AMQNET-127)

Added:
    activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputChannelListener.cs
    activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputQueueChannelBase.cs
    activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputSessionChannel.cs
    activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputSessionChannelListener.cs
    activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputChannelBase.cs
    activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputSessionChannel.cs
Removed:
    activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsChannelBase.cs
    activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsChannelListener.cs
Modified:
    activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/Configuration/NmsTransportBindingElement.cs
    activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/InputQueue.cs
    activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsChannelFactory.cs
    activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsChannelHelper.cs
    activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputChannel.cs
    activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputChannel.cs
    activemq/activemq-dotnet/Apache.NMS.WCF/trunk/vs2008-nms-wcf.csproj

Modified: activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/Configuration/NmsTransportBindingElement.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/Configuration/NmsTransportBindingElement.cs?rev=726083&r1=726082&r2=726083&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/Configuration/NmsTransportBindingElement.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/Configuration/NmsTransportBindingElement.cs Fri Dec 12 10:25:52 2008
@@ -26,12 +26,20 @@
 namespace Apache.NMS.WCF
 {
 	/// <summary>
-	/// key class to specify the custom transport class and its schema.
-	/// Its key role in the WCF is to be the 'factory of factories'.  It determines what shape
-	/// the channel will be.  In this case by returning channel factory (for the client) that returns a IOutputChannel
-	/// and a channel listener (for the server) that returns a IInputChannel, this class determines
-	/// that this implementation is a datagram 'shape'.
+	/// Key class to specify the custom transport class and its schema.
 	/// </summary>
+	/// <remarks>
+	/// <para>
+	/// Its key role in the WCF is to be the 'factory of factories'.  It determines what shape
+	/// the channel will be. In this case by returning channel factory (for the client) that returns an
+	/// <see cref="IOutputChannel" /> or <see cref="IOutputSessionChannel" />, and a channel listener 
+	/// (for the server) that returns an <see cref="IInputChannel" /> or <see cref="IInputSessionChannel" />,
+	/// this class determines that this implementation is a datagram 'shape'.
+	/// </para>
+	/// <para>
+	/// The request/reply channel shape is not supported by WCF.
+	/// </para>
+	/// </remarks>
 	public class NmsTransportBindingElement : TransportBindingElement, IWsdlExportExtension, IPolicyExportExtension
 	{
 		#region Constructors
@@ -85,31 +93,31 @@
 
 		/// <summary>
 		/// Determines whether this instance can build a channel factory in the specified context.
-		/// In this case an implementation of IOutputChannel.
+		/// Only implementations of <see cref="IOutputChannel" /> and <see cref="IOutputSessionChannel" /> are supported.
 		/// </summary>
 		/// <typeparam name="TChannel">The type of the channel.</typeparam>
 		/// <param name="context">The context.</param>
 		/// <returns>
-		/// 	<c>true</c> if this instance [can build channel factory] the specified context; otherwise, <c>false</c>.
+		/// 	<c>true</c> if the requested channel factory can be built; otherwise, <c>false</c>.
 		/// </returns>
 		public override bool CanBuildChannelFactory<TChannel>(BindingContext context)
 		{
-			return (typeof(TChannel) == typeof(IOutputChannel));
+			return (typeof(TChannel) == typeof(IOutputChannel) || typeof(TChannel) == typeof(IOutputSessionChannel));
 		}
 
 		/// <summary>
 		/// Determines whether this instance can build a channel listener in the specified context.
-		/// In this case in implementation that will return an IInputChannel.
+		/// Only implementations of <see cref="IInputChannel" /> and <see cref="IInputSessionChannel" /> are supported.
 		/// </summary>
 		/// <typeparam name="TChannel">The type of the channel.</typeparam>
 		/// <param name="context">The context.</param>
 		/// <returns>
-		/// 	<c>true</c> if this instance [can build channel listener] the specified context; otherwise, <c>false</c>.
+		/// 	<c>true</c> if the requested channel listener can be built; otherwise, <c>false</c>.
 		/// </returns>
 		/// <exception cref="ArgumentException">the requested channel does not implement <see cref="IReplyChannel" />.</exception>
 		public override bool CanBuildChannelListener<TChannel>(BindingContext context)
 		{
-			return (typeof(TChannel) == typeof(IInputChannel));
+			return (typeof(TChannel) == typeof(IInputChannel) || typeof(TChannel) == typeof(IInputSessionChannel));
 		}
 
 		/// <summary>
@@ -129,7 +137,7 @@
 			{
 				throw new ArgumentException(String.Format("Unsupported channel type: {0}.", typeof(TChannel).Name));
 			}
-			return (IChannelFactory<TChannel>) new NmsChannelFactory(this, context);
+			return (IChannelFactory<TChannel>) new NmsChannelFactory<TChannel>(this, context);
 		}
 
 		/// <summary>
@@ -144,11 +152,17 @@
 			{
 				throw new ArgumentNullException("context");
 			}
+
 			if(!CanBuildChannelListener<TChannel>(context))
 			{
 				throw new ArgumentException(String.Format("Unsupported channel type: {0}.", typeof(TChannel).Name));
 			}
-			return (IChannelListener<TChannel>) new NmsChannelListener(this, context);
+
+			if (typeof(TChannel) == typeof(IInputSessionChannel))
+			{
+				return (IChannelListener<TChannel>)new NmsInputSessionChannelListener(this, context);
+			}
+			return (IChannelListener<TChannel>) new NmsInputChannelListener(this, context);
 		}
 
 		/// <summary>

Modified: activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/InputQueue.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/InputQueue.cs?rev=726083&r1=726082&r2=726083&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/InputQueue.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/InputQueue.cs Fri Dec 12 10:25:52 2008
@@ -24,24 +24,24 @@
 namespace Apache.NMS.WCF
 {
 	// ItemDequeuedCallback is called as an item is dequeued from the InputQueue.  The 
-	// InputQueue lock is not held during the callback.  However, the user code is
-	// not notified of the item being available until the callback returns.  If you
-	// are not sure if the callback blocks for a long time, then first call 
+	// InputQueue lock is not held during the callback.  However, the user code will
+	// not be notified of the item being available until the callback returns.  If you
+	// are not sure if the callback will block for a long time, then first call 
 	// IOThreadScheduler.ScheduleCallback to get to a "safe" thread.
-	internal delegate void ItemDequeuedCallback();
+	delegate void ItemDequeuedCallback();
 
 	/// <summary>
 	/// Handles asynchronous interactions between producers and consumers. 
 	/// Producers can dispatch available data to the input queue, 
-	/// where it is dispatched to a waiting consumer or stored until a
+	/// where it will be dispatched to a waiting consumer or stored until a
 	/// consumer becomes available. Consumers can synchronously or asynchronously
-	/// request data from the queue, which is returned when data becomes
+	/// request data from the queue, which will be returned when data becomes
 	/// available.
 	/// </summary>
 	/// <typeparam name="T">The concrete type of the consumer objects that are waiting for data.</typeparam>
-	internal class InputQueue<T> : IDisposable where T : class
+	class InputQueue<T> : IDisposable where T : class
 	{
-		//Stores items that are waiting to be accessed.
+		//Stores items that are waiting to be consumed.
 		ItemQueue itemQueue;
 
 		//Each IQueueReader represents some consumer that is waiting for
@@ -51,7 +51,7 @@
 
 		//Each IQueueWaiter represents some waiter that is waiting for
 		//items to appear in the queue.  When any item appears, all
-		//waiters are signaled.
+		//waiters are signalled.
 		List<IQueueWaiter> waiterList;
 
 		static WaitCallback onInvokeDequeuedCallback;
@@ -60,9 +60,9 @@
 		static WaitCallback completeWaitersFalseCallback;
 		static WaitCallback completeWaitersTrueCallback;
 
-		//Represents the current state of the InputQueue.
+		//Represents the current state of the InputQueue
 		//as it transitions through its lifecycle.
-		QueueState _queueState;
+		QueueState queueState;
 		enum QueueState
 		{
 			Open,
@@ -70,15 +70,12 @@
 			Closed
 		}
 
-		/// <summary>
-		/// Initializes a new instance of the <see cref="InputQueue&lt;T&gt;"/> class.
-		/// </summary>
 		public InputQueue()
 		{
-			itemQueue = new ItemQueue();
-			readerQueue = new Queue<IQueueReader>();
-			waiterList = new List<IQueueWaiter>();
-			_queueState = QueueState.Open;
+			this.itemQueue = new ItemQueue();
+			this.readerQueue = new Queue<IQueueReader>();
+			this.waiterList = new List<IQueueWaiter>();
+			this.queueState = QueueState.Open;
 		}
 
 		public int PendingCount
@@ -92,6 +89,33 @@
 			}
 		}
 
+		// added by Roman
+		public int NumberOfReaders
+		{
+			get
+			{
+				lock(ThisLock)
+				{
+					return readerQueue.Count;
+				}
+			}
+		}
+		public void Open()
+		{
+			lock(ThisLock)
+			{
+				if(queueState == QueueState.Open)
+				{
+					return;
+				}
+
+				if(queueState == QueueState.Closed)
+				{
+					throw new ObjectDisposedException(this.GetType().ToString());
+				}
+			}
+		}
+
 		object ThisLock
 		{
 			get { return itemQueue; }
@@ -103,7 +127,7 @@
 
 			lock(ThisLock)
 			{
-				if(_queueState == QueueState.Open)
+				if(queueState == QueueState.Open)
 				{
 					if(itemQueue.HasAvailableItem)
 					{
@@ -116,7 +140,7 @@
 						return reader;
 					}
 				}
-				else if(_queueState == QueueState.Shutdown)
+				else if(queueState == QueueState.Shutdown)
 				{
 					if(itemQueue.HasAvailableItem)
 					{
@@ -139,7 +163,7 @@
 		{
 			lock(ThisLock)
 			{
-				if(_queueState == QueueState.Open)
+				if(queueState == QueueState.Open)
 				{
 					if(!itemQueue.HasAvailableItem)
 					{
@@ -148,7 +172,7 @@
 						return waiter;
 					}
 				}
-				else if(_queueState == QueueState.Shutdown)
+				else if(queueState == QueueState.Shutdown)
 				{
 					if(!itemQueue.HasAvailableItem && itemQueue.HasAnyItem)
 					{
@@ -196,7 +220,7 @@
 			{
 				if(completeWaitersTrueCallback == null)
 				{
-					completeWaitersTrueCallback = CompleteWaitersTrueCallback;
+					completeWaitersTrueCallback = new WaitCallback(CompleteWaitersTrueCallback);
 				}
 
 				ThreadPool.QueueUserWorkItem(completeWaitersTrueCallback, waiters);
@@ -205,7 +229,7 @@
 			{
 				if(completeWaitersFalseCallback == null)
 				{
-					completeWaitersFalseCallback = CompleteWaitersFalseCallback;
+					completeWaitersFalseCallback = new WaitCallback(CompleteWaitersFalseCallback);
 				}
 
 				ThreadPool.QueueUserWorkItem(completeWaitersFalseCallback, waiters);
@@ -235,19 +259,19 @@
 			IQueueReader[] outstandingReaders = null;
 			lock(ThisLock)
 			{
-				if(_queueState == QueueState.Shutdown)
+				if(queueState == QueueState.Shutdown)
 				{
 					return;
 				}
 
-				if(_queueState == QueueState.Closed)
+				if(queueState == QueueState.Closed)
 				{
 					return;
 				}
 
-				_queueState = QueueState.Shutdown;
+				this.queueState = QueueState.Shutdown;
 
-				if(readerQueue.Count > 0 && itemQueue.ItemCount == 0)
+				if(readerQueue.Count > 0 && this.itemQueue.ItemCount == 0)
 				{
 					outstandingReaders = new IQueueReader[readerQueue.Count];
 					readerQueue.CopyTo(outstandingReaders, 0);
@@ -268,7 +292,7 @@
 		{
 			T value;
 
-			if(!Dequeue(timeout, out value))
+			if(!this.Dequeue(timeout, out value))
 			{
 				throw new TimeoutException(string.Format("Dequeue timed out in {0}.", timeout));
 			}
@@ -283,7 +307,7 @@
 
 			lock(ThisLock)
 			{
-				if(_queueState == QueueState.Open)
+				if(queueState == QueueState.Open)
 				{
 					if(itemQueue.HasAvailableItem)
 					{
@@ -295,7 +319,7 @@
 						readerQueue.Enqueue(reader);
 					}
 				}
-				else if(_queueState == QueueState.Shutdown)
+				else if(queueState == QueueState.Shutdown)
 				{
 					if(itemQueue.HasAvailableItem)
 					{
@@ -323,15 +347,18 @@
 			{
 				return reader.Wait(timeout, out value);
 			}
-
-			InvokeDequeuedCallback(item.DequeuedCallback);
-			value = item.GetValue();
-			return true;
+			else
+			{
+				InvokeDequeuedCallback(item.DequeuedCallback);
+				value = item.GetValue();
+				return true;
+			}
 		}
 
 		public void Dispose()
 		{
 			Dispose(true);
+
 			GC.SuppressFinalize(this);
 		}
 
@@ -343,9 +370,9 @@
 
 				lock(ThisLock)
 				{
-					if(_queueState != QueueState.Closed)
+					if(queueState != QueueState.Closed)
 					{
-						_queueState = QueueState.Closed;
+						queueState = QueueState.Closed;
 						dispose = true;
 					}
 				}
@@ -378,10 +405,10 @@
 
 			lock(ThisLock)
 			{
-				itemAvailable = !((_queueState == QueueState.Closed) || (_queueState == QueueState.Shutdown));
-				GetWaiters(out waiters);
+				itemAvailable = !((queueState == QueueState.Closed) || (queueState == QueueState.Shutdown));
+				this.GetWaiters(out waiters);
 
-				if(_queueState != QueueState.Closed)
+				if(queueState != QueueState.Closed)
 				{
 					itemQueue.MakePendingItemAvailable();
 
@@ -390,7 +417,7 @@
 						item = itemQueue.DequeueAvailableItem();
 						reader = readerQueue.Dequeue();
 
-						if(_queueState == QueueState.Shutdown && readerQueue.Count > 0 && itemQueue.ItemCount == 0)
+						if(queueState == QueueState.Shutdown && readerQueue.Count > 0 && itemQueue.ItemCount == 0)
 						{
 							outstandingReaders = new IQueueReader[readerQueue.Count];
 							readerQueue.CopyTo(outstandingReaders, 0);
@@ -405,7 +432,9 @@
 			if(outstandingReaders != null)
 			{
 				if(completeOutstandingReadersCallback == null)
-					completeOutstandingReadersCallback = CompleteOutstandingReadersCallback;
+				{
+					completeOutstandingReadersCallback = new WaitCallback(CompleteOutstandingReadersCallback);
+				}
 
 				ThreadPool.QueueUserWorkItem(completeOutstandingReadersCallback, outstandingReaders);
 			}
@@ -427,7 +456,7 @@
 		{
 			T value;
 
-			if(!EndDequeue(result, out value))
+			if(!this.EndDequeue(result, out value))
 			{
 				throw new TimeoutException("Asynchronous Dequeue operation timed out.");
 			}
@@ -467,6 +496,7 @@
 		public void EnqueueAndDispatch(T item, ItemDequeuedCallback dequeuedCallback)
 		{
 			EnqueueAndDispatch(item, dequeuedCallback, true);
+			//EnqueueAndDispatch(item, dequeuedCallback, false);
 		}
 
 		public void EnqueueAndDispatch(Exception exception, ItemDequeuedCallback dequeuedCallback, bool canDispatchOnThisThread)
@@ -491,10 +521,10 @@
 
 			lock(ThisLock)
 			{
-				itemAvailable = !((_queueState == QueueState.Closed) || (_queueState == QueueState.Shutdown));
-				GetWaiters(out waiters);
+				itemAvailable = !((queueState == QueueState.Closed) || (queueState == QueueState.Shutdown));
+				this.GetWaiters(out waiters);
 
-				if(_queueState == QueueState.Open)
+				if(queueState == QueueState.Open)
 				{
 					if(canDispatchOnThisThread)
 					{
@@ -548,7 +578,7 @@
 			{
 				if(onDispatchCallback == null)
 				{
-					onDispatchCallback = OnDispatchCallback;
+					onDispatchCallback = new WaitCallback(OnDispatchCallback);
 				}
 
 				ThreadPool.QueueUserWorkItem(onDispatchCallback, this);
@@ -572,22 +602,25 @@
 			return EnqueueWithoutDispatch(new Item(exception, dequeuedCallback));
 		}
 
-		// This does not block, however, Dispatch() must be called later if this function
+		// This will not block, however, Dispatch() must be called later if this function
 		// returns true.
 		bool EnqueueWithoutDispatch(Item item)
 		{
 			lock(ThisLock)
 			{
 				// Open
-				if(_queueState != QueueState.Closed && _queueState != QueueState.Shutdown)
+				if(queueState != QueueState.Closed && queueState != QueueState.Shutdown)
 				{
 					if(readerQueue.Count == 0)
 					{
 						itemQueue.EnqueueAvailableItem(item);
 						return false;
 					}
-					itemQueue.EnqueuePendingItem(item);
-					return true;
+					else
+					{
+						itemQueue.EnqueuePendingItem(item);
+						return true;
+					}
 				}
 			}
 
@@ -632,14 +665,14 @@
 		{
 			lock(ThisLock)
 			{
-				if(_queueState == QueueState.Open || _queueState == QueueState.Shutdown)
+				if(queueState == QueueState.Open || queueState == QueueState.Shutdown)
 				{
 					bool removed = false;
 
 					for(int i = readerQueue.Count; i > 0; i--)
 					{
 						IQueueReader temp = readerQueue.Dequeue();
-						if(ReferenceEquals(temp, reader))
+						if(Object.ReferenceEquals(temp, reader))
 						{
 							removed = true;
 						}
@@ -663,7 +696,7 @@
 
 			lock(ThisLock)
 			{
-				if(_queueState == QueueState.Open)
+				if(queueState == QueueState.Open)
 				{
 					if(itemQueue.HasAvailableItem)
 					{
@@ -675,7 +708,7 @@
 						waiterList.Add(waiter);
 					}
 				}
-				else if(_queueState == QueueState.Shutdown)
+				else if(queueState == QueueState.Shutdown)
 				{
 					if(itemQueue.HasAvailableItem)
 					{
@@ -697,7 +730,14 @@
 				}
 			}
 
-			return waiter != null ? waiter.Wait(timeout) : itemAvailable;
+			if(waiter != null)
+			{
+				return waiter.Wait(timeout);
+			}
+			else
+			{
+				return itemAvailable;
+			}
 		}
 
 		interface IQueueReader
@@ -712,23 +752,23 @@
 
 		class WaitQueueReader : IQueueReader
 		{
-			Exception _exception;
-			InputQueue<T> _inputQueue;
-			T _item;
-			ManualResetEvent _waitEvent;
-			object _thisLock = new object();
+			Exception exception;
+			InputQueue<T> inputQueue;
+			T item;
+			ManualResetEvent waitEvent;
+			object thisLock = new object();
 
 			public WaitQueueReader(InputQueue<T> inputQueue)
 			{
-				_inputQueue = inputQueue;
-				_waitEvent = new ManualResetEvent(false);
+				this.inputQueue = inputQueue;
+				waitEvent = new ManualResetEvent(false);
 			}
 
 			object ThisLock
 			{
 				get
 				{
-					return _thisLock;
+					return this.thisLock;
 				}
 			}
 
@@ -736,12 +776,12 @@
 			{
 				lock(ThisLock)
 				{
-					Debug.Assert(_item == null, "InputQueue.WaitQueueReader.Set: (this.item == null)");
-					Debug.Assert(_exception == null, "InputQueue.WaitQueueReader.Set: (this.exception == null)");
+					Debug.Assert(this.item == null, "InputQueue.WaitQueueReader.Set: (this.item == null)");
+					Debug.Assert(this.exception == null, "InputQueue.WaitQueueReader.Set: (this.exception == null)");
 
-					_exception = item.Exception;
-					_item = item.Value;
-					_waitEvent.Set();
+					this.exception = item.Exception;
+					this.item = item.Value;
+					waitEvent.Set();
 				}
 			}
 
@@ -752,11 +792,11 @@
 				{
 					if(timeout == TimeSpan.MaxValue)
 					{
-						_waitEvent.WaitOne();
+						waitEvent.WaitOne();
 					}
-					else if(!_waitEvent.WaitOne(timeout, false))
+					else if(!waitEvent.WaitOne(timeout, false))
 					{
-						if(_inputQueue.RemoveReader(this))
+						if(this.inputQueue.RemoveReader(this))
 						{
 							value = default(T);
 							isSafeToClose = true;
@@ -764,7 +804,7 @@
 						}
 						else
 						{
-							_waitEvent.WaitOne();
+							waitEvent.WaitOne();
 						}
 					}
 
@@ -774,31 +814,31 @@
 				{
 					if(isSafeToClose)
 					{
-						_waitEvent.Close();
+						waitEvent.Close();
 					}
 				}
 
-				value = _item;
+				value = item;
 				return true;
 			}
 		}
 
-		class AsyncQueueReader : AsyncResult, IQueueReader
+		public class AsyncQueueReader : AsyncResult, IQueueReader
 		{
 			static TimerCallback timerCallback = new TimerCallback(AsyncQueueReader.TimerCallback);
 
-			bool _expired;
-			InputQueue<T> _inputQueue;
-			T _item;
-			Timer _timer;
+			bool expired;
+			InputQueue<T> inputQueue;
+			T item;
+			Timer timer;
 
 			public AsyncQueueReader(InputQueue<T> inputQueue, TimeSpan timeout, AsyncCallback callback, object state)
 				: base(callback, state)
 			{
-				_inputQueue = inputQueue;
+				this.inputQueue = inputQueue;
 				if(timeout != TimeSpan.MaxValue)
 				{
-					_timer = new Timer(timerCallback, this, timeout, TimeSpan.FromMilliseconds(-1));
+					this.timer = new Timer(timerCallback, this, timeout, TimeSpan.FromMilliseconds(-1));
 				}
 			}
 
@@ -806,262 +846,190 @@
 			{
 				AsyncQueueReader readerResult = AsyncResult.End<AsyncQueueReader>(result);
 
-				if(readerResult._expired)
+				if(readerResult.expired)
 				{
 					value = default(T);
 					return false;
 				}
-
-				value = readerResult._item;
-				return true;
+				else
+				{
+					value = readerResult.item;
+					return true;
+				}
 			}
 
 			static void TimerCallback(object state)
 			{
 				AsyncQueueReader thisPtr = (AsyncQueueReader) state;
-				if(thisPtr._inputQueue.RemoveReader(thisPtr))
+				if(thisPtr.inputQueue.RemoveReader(thisPtr))
 				{
-					thisPtr._expired = true;
+					thisPtr.expired = true;
 					thisPtr.Complete(false);
 				}
 			}
 
 			public void Set(Item item)
 			{
-				_item = item.Value;
-				if(_timer != null)
+				this.item = item.Value;
+				if(this.timer != null)
 				{
-					_timer.Change(-1, -1);
+					this.timer.Change(-1, -1);
 				}
 				Complete(false, item.Exception);
 			}
 		}
 
-		internal struct Item
+		public struct Item
 		{
-			private T _value;
-			private Exception _exception;
-			ItemDequeuedCallback _dequeuedCallback;
-
-			/// <summary>
-			/// Initializes a new instance of the <see cref="InputQueue&lt;T&gt;.Item"/> class.
-			/// </summary>
-			/// <param name="value">The value.</param>
-			/// <param name="dequeuedCallback">The dequeued callback.</param>
+			T value;
+			Exception exception;
+			ItemDequeuedCallback dequeuedCallback;
+
 			public Item(T value, ItemDequeuedCallback dequeuedCallback)
 				: this(value, null, dequeuedCallback)
 			{
 			}
 
-			/// <summary>
-			/// Initializes a new instance of the <see cref="InputQueue&lt;T&gt;.Item"/> class.
-			/// </summary>
-			/// <param name="exception">The exception.</param>
-			/// <param name="dequeuedCallback">The dequeued callback.</param>
 			public Item(Exception exception, ItemDequeuedCallback dequeuedCallback)
 				: this(null, exception, dequeuedCallback)
 			{
 			}
 
-			/// <summary>
-			/// Initializes a new instance of the <see cref="InputQueue&lt;T&gt;.Item"/> class.
-			/// </summary>
-			/// <param name="value">The value.</param>
-			/// <param name="exception">The exception.</param>
-			/// <param name="dequeuedCallback">The dequeued callback.</param>
-			internal Item(T value, Exception exception, ItemDequeuedCallback dequeuedCallback)
-			{
-				_value = value;
-				_exception = exception;
-				_dequeuedCallback = dequeuedCallback;
-			}
-
-			/// <summary>
-			/// Gets the exception.
-			/// </summary>
-			/// <value>The exception.</value>
+			Item(T value, Exception exception, ItemDequeuedCallback dequeuedCallback)
+			{
+				this.value = value;
+				this.exception = exception;
+				this.dequeuedCallback = dequeuedCallback;
+			}
+
 			public Exception Exception
 			{
-				get { return _exception; }
+				get { return this.exception; }
 			}
 
-			/// <summary>
-			/// Gets the value.
-			/// </summary>
-			/// <value>The value.</value>
 			public T Value
 			{
-				get { return _value; }
+				get { return value; }
 			}
 
-			/// <summary>
-			/// Gets the dequeued callback.
-			/// </summary>
-			/// <value>The dequeued callback.</value>
 			public ItemDequeuedCallback DequeuedCallback
 			{
-				get { return _dequeuedCallback; }
+				get { return dequeuedCallback; }
 			}
 
-			/// <summary>
-			/// Releases unmanaged and - optionally - managed resources
-			/// </summary>
 			public void Dispose()
 			{
-				if(_value != null)
+				if(value != null)
 				{
-					if(_value is IDisposable)
+					if(value is IDisposable)
 					{
-						((IDisposable) _value).Dispose();
+						((IDisposable) value).Dispose();
 					}
-					else if(_value is ICommunicationObject)
+					else if(value is ICommunicationObject)
 					{
-						((ICommunicationObject) _value).Abort();
+						((ICommunicationObject) value).Abort();
 					}
 				}
 			}
 
-			/// <summary>
-			/// Gets the value.
-			/// </summary>
-			/// <returns></returns>
 			public T GetValue()
 			{
-				if(_exception != null)
+				if(this.exception != null)
 				{
-					throw _exception;
+					throw this.exception;
 				}
 
-				return _value;
+				return this.value;
 			}
 		}
 
-		internal class WaitQueueWaiter : IQueueWaiter
+		class WaitQueueWaiter : IQueueWaiter
 		{
-			bool _itemAvailable;
-			ManualResetEvent _waitEvent;
-			object _thisLock = new object();
+			bool itemAvailable;
+			ManualResetEvent waitEvent;
+			object thisLock = new object();
 
-			/// <summary>
-			/// Initializes a new instance of the <see cref="InputQueue&lt;T&gt;.WaitQueueWaiter"/> class.
-			/// </summary>
 			public WaitQueueWaiter()
 			{
-				_waitEvent = new ManualResetEvent(false);
+				waitEvent = new ManualResetEvent(false);
 			}
 
-			/// <summary>
-			/// Gets the this lock.
-			/// </summary>
-			/// <value>The this lock.</value>
 			object ThisLock
 			{
 				get
 				{
-					return _thisLock;
+					return this.thisLock;
 				}
 			}
 
-			/// <summary>
-			/// Sets the specified item available.
-			/// </summary>
-			/// <param name="itemAvailable">if set to <see langword="true"/> [item available].</param>
 			public void Set(bool itemAvailable)
 			{
 				lock(ThisLock)
 				{
-					_itemAvailable = itemAvailable;
-					_waitEvent.Set();
+					this.itemAvailable = itemAvailable;
+					waitEvent.Set();
 				}
 			}
 
-			/// <summary>
-			/// Waits the specified timeout.
-			/// </summary>
-			/// <param name="timeout">The timeout.</param>
-			/// <returns></returns>
 			public bool Wait(TimeSpan timeout)
 			{
 				if(timeout == TimeSpan.MaxValue)
 				{
-					_waitEvent.WaitOne();
+					waitEvent.WaitOne();
 				}
-				else if(!_waitEvent.WaitOne(timeout, false))
+				else if(!waitEvent.WaitOne(timeout, false))
 				{
 					return false;
 				}
 
-				return _itemAvailable;
+				return this.itemAvailable;
 			}
 		}
 
-		internal class AsyncQueueWaiter : AsyncResult, IQueueWaiter
+		class AsyncQueueWaiter : AsyncResult, IQueueWaiter
 		{
 			static TimerCallback timerCallback = new TimerCallback(AsyncQueueWaiter.TimerCallback);
-			Timer _timer;
-			bool _itemAvailable;
-			object _thisLock = new object();
-
-			/// <summary>
-			/// Initializes a new instance of the <see cref="InputQueue&lt;T&gt;.AsyncQueueWaiter"/> class.
-			/// </summary>
-			/// <param name="timeout">The timeout.</param>
-			/// <param name="callback">The callback.</param>
-			/// <param name="state">The state.</param>
+			Timer timer;
+			bool itemAvailable;
+			object thisLock = new object();
+
 			public AsyncQueueWaiter(TimeSpan timeout, AsyncCallback callback, object state)
 				: base(callback, state)
 			{
 				if(timeout != TimeSpan.MaxValue)
 				{
-					_timer = new Timer(timerCallback, this, timeout, TimeSpan.FromMilliseconds(-1));
+					this.timer = new Timer(timerCallback, this, timeout, TimeSpan.FromMilliseconds(-1));
 				}
 			}
 
-			/// <summary>
-			/// Gets the this lock.
-			/// </summary>
-			/// <value>The this lock.</value>
 			object ThisLock
 			{
 				get
 				{
-					return _thisLock;
+					return this.thisLock;
 				}
 			}
 
-			/// <summary>
-			/// Ends the specified result.
-			/// </summary>
-			/// <param name="result">The result.</param>
-			/// <returns></returns>
 			public static bool End(IAsyncResult result)
 			{
 				AsyncQueueWaiter waiterResult = AsyncResult.End<AsyncQueueWaiter>(result);
-				return waiterResult._itemAvailable;
+				return waiterResult.itemAvailable;
 			}
 
-			/// <summary>
-			/// Callback that is invoked when the timer completes.
-			/// </summary>
-			/// <param name="state">The state.</param>
-			public static void TimerCallback(object state)
+			static void TimerCallback(object state)
 			{
 				AsyncQueueWaiter thisPtr = (AsyncQueueWaiter) state;
 				thisPtr.Complete(false);
 			}
 
-			/// <summary>
-			/// Sets the specified item available.
-			/// </summary>
-			/// <param name="itemAvailable">if set to <see langword="true"/> [item available].</param>
 			public void Set(bool itemAvailable)
 			{
 				bool timely;
 
 				lock(ThisLock)
 				{
-					timely = (_timer == null) || _timer.Change(-1, -1);
-					_itemAvailable = itemAvailable;
+					timely = (this.timer == null) || this.timer.Change(-1, -1);
+					this.itemAvailable = itemAvailable;
 				}
 
 				if(timely)
@@ -1071,142 +1039,104 @@
 			}
 		}
 
-		internal class ItemQueue
+		class ItemQueue
 		{
-			Item[] _items;
-			int _head;
-			int _pendingCount;
-			int _totalCount;
-
-			/// <summary>
-			/// Initializes a new instance of the <see cref="InputQueue&lt;T&gt;.ItemQueue"/> class.
-			/// </summary>
+			Item[] items;
+			int head;
+			int pendingCount;
+			int totalCount;
+
 			public ItemQueue()
 			{
-				_items = new Item[1];
+				items = new Item[1];
 			}
 
-			/// <summary>
-			/// Dequeues the available item.
-			/// </summary>
-			/// <returns></returns>
 			public Item DequeueAvailableItem()
 			{
-				if(_totalCount == _pendingCount)
+				if(totalCount == pendingCount)
 				{
-					throw new Exception("Internal Error - ItemQueue does not contain any available items");
+					Debug.Assert(false, "ItemQueue does not contain any available items");
+					throw new Exception("Internal Error");
 				}
 				return DequeueItemCore();
 			}
 
-			/// <summary>
-			/// Dequeues any item.
-			/// </summary>
-			/// <returns></returns>
 			public Item DequeueAnyItem()
 			{
-				if(_pendingCount == _totalCount)
+				if(pendingCount == totalCount)
 				{
-					_pendingCount--;
+					pendingCount--;
 				}
 				return DequeueItemCore();
 			}
 
-			/// <summary>
-			/// Enqueues the item core.
-			/// </summary>
-			/// <param name="item">The item.</param>
 			void EnqueueItemCore(Item item)
 			{
-				if(_totalCount == _items.Length)
+				if(totalCount == items.Length)
 				{
-					Item[] newItems = new Item[_items.Length * 2];
-					for(int i = 0; i < _totalCount; i++)
+					Item[] newItems = new Item[items.Length * 2];
+					for(int i = 0; i < totalCount; i++)
 					{
-						newItems[i] = _items[(_head + i) % _items.Length];
+						newItems[i] = items[(head + i) % items.Length];
 					}
-					_head = 0;
-					_items = newItems;
+
+					head = 0;
+					items = newItems;
 				}
-				int tail = (_head + _totalCount) % _items.Length;
-				_items[tail] = item;
-				_totalCount++;
+				int tail = (head + totalCount) % items.Length;
+				items[tail] = item;
+				totalCount++;
 			}
 
-			/// <summary>
-			/// Dequeues the item core.
-			/// </summary>
-			/// <returns></returns>
 			Item DequeueItemCore()
 			{
-				if(_totalCount == 0)
+				if(totalCount == 0)
 				{
-					throw new Exception("Internal Error - ItemQueue does not contain any items");
+					Debug.Assert(false, "ItemQueue does not contain any items");
+					throw new Exception("Internal Error");
 				}
-				Item item = _items[_head];
-				_items[_head] = new Item();
-				_totalCount--;
-				_head = (_head + 1) % _items.Length;
+				Item item = items[head];
+				items[head] = new Item();
+				totalCount--;
+				head = (head + 1) % items.Length;
 				return item;
 			}
 
-			/// <summary>
-			/// Enqueues the pending item.
-			/// </summary>
-			/// <param name="item">The item.</param>
 			public void EnqueuePendingItem(Item item)
 			{
 				EnqueueItemCore(item);
-				_pendingCount++;
+				pendingCount++;
 			}
 
-			/// <summary>
-			/// Enqueues the available item.
-			/// </summary>
-			/// <param name="item">The item.</param>
 			public void EnqueueAvailableItem(Item item)
 			{
 				EnqueueItemCore(item);
 			}
 
-			/// <summary>
-			/// Makes the pending item available.
-			/// </summary>
 			public void MakePendingItemAvailable()
 			{
-				if(_pendingCount == 0)
+				if(pendingCount == 0)
 				{
-					throw new Exception("Internal Error - ItemQueue does not contain any pending items");
+					Debug.Assert(false, "ItemQueue does not contain any pending items");
+					throw new Exception("Internal Error");
 				}
-				_pendingCount--;
+				pendingCount--;
 			}
 
-			/// <summary>
-			/// Gets a value indicating whether this instance has available items.
-			/// </summary>
-			/// <value>
-			/// 	<see langword="true"/> if this instance has available item; otherwise, <see langword="false"/>.
-			/// </value>
 			public bool HasAvailableItem
 			{
-				get { return _totalCount > _pendingCount; }
+				get { return totalCount > pendingCount; }
 			}
 
-			/// <summary>
-			/// Gets a value indicating whether this instance has any item.
-			/// </summary>
-			/// <value>
-			/// 	<see langword="true"/> if this instance has any item; otherwise, <see langword="false"/>.
-			/// </value>
 			public bool HasAnyItem
 			{
-				get { return _totalCount > 0; }
+				get { return totalCount > 0; }
 			}
 
 			public int ItemCount
 			{
-				get { return _totalCount; }
+				get { return totalCount; }
 			}
 		}
 	}
-}
\ No newline at end of file
+}

Modified: activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsChannelFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsChannelFactory.cs?rev=726083&r1=726082&r2=726083&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsChannelFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsChannelFactory.cs Fri Dec 12 10:25:52 2008
@@ -25,18 +25,20 @@
 	/// <summary>
 	/// Factory for message channels.
 	/// </summary>
-	public class NmsChannelFactory : ChannelFactoryBase<IOutputChannel>
+	public class NmsChannelFactory<TChannel> : ChannelFactoryBase<TChannel>
 	{
 		#region Constructors
 
 		/// <summary>
-		/// Initializes a new instance of the <see cref="NmsChannelFactory"/> class.
+		/// Initializes a new instance of the <see cref="NmsChannelFactory&lt;TChannel&gt;"/> class.
 		/// </summary>
+		/// <param name="bindingElement">The binding element.</param>
 		/// <param name="context">The context.</param>
-		/// <param name="transportElement">The binding element.</param>
-		internal NmsChannelFactory(NmsTransportBindingElement transportElement, BindingContext context)
+		internal NmsChannelFactory(NmsTransportBindingElement bindingElement, BindingContext context)
 			: base(context.Binding)
 		{
+			_bindingElement = bindingElement;
+
 			Collection<MessageEncodingBindingElement> messageEncoderBindingElements = context.BindingParameters.FindAll<MessageEncodingBindingElement>();
 			if(messageEncoderBindingElements.Count > 1)
 			{
@@ -46,9 +48,9 @@
 				? NmsConstants.DefaultMessageEncoderFactory
 				: messageEncoderBindingElements[0].CreateMessageEncoderFactory();
 
-			_bufferManager = BufferManager.CreateBufferManager(transportElement.MaxBufferPoolSize, Int32.MaxValue);
-			_destination = transportElement.Destination;
-			_destinationType = transportElement.DestinationType;
+			_bufferManager = BufferManager.CreateBufferManager(bindingElement.MaxBufferPoolSize, Int32.MaxValue);
+			_destination = bindingElement.Destination;
+			_destinationType = bindingElement.DestinationType;
 
 			Tracer.DebugFormat("Destination ({0}) : {1}", _destinationType, _destination);
 		}
@@ -100,9 +102,24 @@
 		/// </returns>
 		/// <param name="address">The <see cref="T:System.ServiceModel.EndpointAddress" /> of the remote endpoint to which the channel sends messages.</param>
 		/// <param name="via">The <see cref="T:System.Uri" /> that contains the transport address to which messages are sent on the output channel.</param>
-		protected override IOutputChannel OnCreateChannel(EndpointAddress address, Uri via)
+		protected override TChannel OnCreateChannel(EndpointAddress address, Uri via)
 		{
-			return new NmsOutputChannel(BufferManager, MessageEncoderFactory, address, this, via);
+			if(!String.Equals(address.Uri.Scheme, _bindingElement.Scheme, StringComparison.InvariantCultureIgnoreCase))
+			{
+				throw new ArgumentException(String.Format("The scheme {0} specified in address is not supported.", address.Uri.Scheme), "remoteAddress");
+			}
+
+			if(typeof(TChannel) == typeof(IOutputChannel))
+			{
+				return (TChannel) (object) new NmsOutputChannel(this, address, via, BufferManager, MessageEncoderFactory, Destination, DestinationType);
+			}
+
+			if(typeof(TChannel) == typeof(IOutputSessionChannel))
+			{
+				return (TChannel) (object) new NmsOutputSessionChannel(this, via, address, BufferManager, MessageEncoderFactory, Destination, DestinationType);
+			}
+
+			throw new NotSupportedException(String.Format("The requested channel type {0} is not supported", typeof(TChannel)));
 		}
 
 		#endregion
@@ -156,7 +173,8 @@
 		private readonly MessageEncoderFactory _encoderFactory;
 		private readonly string _destination;
 		private readonly DestinationType _destinationType;
+		private readonly NmsTransportBindingElement _bindingElement;
 
 		#endregion
 	}
-}
\ No newline at end of file
+}

Modified: activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsChannelHelper.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsChannelHelper.cs?rev=726083&r1=726082&r2=726083&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsChannelHelper.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsChannelHelper.cs Fri Dec 12 10:25:52 2008
@@ -38,33 +38,11 @@
 		}
 
 		/// <summary>
-		/// Gets the name of the queue from the URI.
+		/// Creates a unique session identifier.
 		/// </summary>
-		/// <param name="uri">The URI of the message queue.</param>
-		public static string GetQueueName(Uri uri)
+		public static string CreateUniqueSessionId()
 		{
-			return uri.LocalPath.TrimStart('/');
-		}
-
-		/// <summary>
-		/// Gets the destination.
-		/// </summary>
-		/// <param name="session">The session.</param>
-		/// <param name="destination">The destination.</param>
-		/// <param name="destinationType">Type of the destination.</param>
-		public static IDestination GetDestination(NMS.ISession session, string destination, DestinationType destinationType)
-		{
-			switch(destinationType)
-			{
-			case DestinationType.Topic:
-			return session.GetTopic(destination);
-			case DestinationType.TemporaryQueue:
-			return session.CreateTemporaryQueue();
-			case DestinationType.TemporaryTopic:
-			return session.CreateTemporaryTopic();
-			default:
-			return session.GetQueue(destination);
-			}
+			return "uuid:/session-gram/" + Guid.NewGuid();
 		}
 	}
-}
\ No newline at end of file
+}

Modified: activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputChannel.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputChannel.cs?rev=726083&r1=726082&r2=726083&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputChannel.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputChannel.cs Fri Dec 12 10:25:52 2008
@@ -18,323 +18,192 @@
 using System;
 using System.ServiceModel;
 using System.ServiceModel.Channels;
-using System.Text;
 
 namespace Apache.NMS.WCF
 {
 	/// <summary>
-	/// Channel for receiving messages.
+	/// Server-side implementation of the sessionless one-way channel.
 	/// </summary>
-	public class NmsInputChannel : NmsChannelBase, IInputChannel
+	public class NmsInputChannel : NmsInputQueueChannelBase<Message>, IInputChannel
 	{
 		#region Constructors
 
 		/// <summary>
 		/// Initializes a new instance of the <see cref="NmsInputChannel"/> class.
 		/// </summary>
-		/// <param name="bufferManager">The buffer manager.</param>
-		/// <param name="encoderFactory">The encoder factory.</param>
-		/// <param name="address">The address.</param>
-		/// <param name="parent">The parent.</param>
-		/// <exception cref="T:System.ArgumentNullException">
-		/// 	<paramref name="channelManager"/> is null.</exception>
-		public NmsInputChannel(BufferManager bufferManager, MessageEncoderFactory encoderFactory, EndpointAddress address, NmsChannelListener parent)
-			: base(bufferManager, encoderFactory, address, parent, parent.Destination, parent.DestinationType)
+		/// <param name="factory">The factory that was used to create the channel.</param>
+		/// <param name="localAddress">The local address of the channel.</param>
+		internal NmsInputChannel(ChannelListenerBase factory, EndpointAddress localAddress)
+			: base(factory, localAddress)
 		{
-			_localAddress = address;
-			_messages = new InputQueue<Message>();
 		}
 
 		#endregion
 
-		//Hands the message off to other components higher up the
-		//channel stack that have previously called BeginReceive() 
-		//and are waiting for messages to arrive on this channel.
-		internal void Dispatch(Message message)
-		{
-			_messages.EnqueueAndDispatch(message);
-		}
+		#region Receive
 
 		/// <summary>
-		/// Gets the property.
-		/// </summary>
-		/// <typeparam name="T">The type of the property to attempt to retrieve.</typeparam>
-		public override T GetProperty<T>()
-		{
-			if(typeof(T) == typeof(IInputChannel))
-			{
-				return (T) (object) this;
-			}
-
-			T messageEncoderProperty = Encoder.GetProperty<T>();
-			if(messageEncoderProperty != null)
-			{
-				return messageEncoderProperty;
-			}
-
-			return base.GetProperty<T>();
-		}
-
-		#region IInputChannel Members
-
-		/// <summary>
-		/// Returns the message received, if one is available. If a message is not available, blocks for a default interval of time.
+		/// Begins an asynchronous operation to receive a message that has a state object associated with it.
 		/// </summary>
+		/// <param name="callback">The <see cref="T:System.AsyncCallback"/> delegate that receives the notification of the asynchronous operation completion.</param>
+		/// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous operation.</param>
 		/// <returns>
-		/// The <see cref="T:System.ServiceModel.Channels.Message" /> received. 
+		/// The <see cref="T:System.IAsyncResult"/> that references the asynchronous message reception.
 		/// </returns>
-		public Message Receive()
+		public IAsyncResult BeginReceive(AsyncCallback callback, object state)
 		{
-			return Receive(DefaultReceiveTimeout);
+			return BeginReceive(DefaultReceiveTimeout, callback, state);
 		}
 
 		/// <summary>
-		/// Returns the message received, if one is available. If a message is not available, blocks for a specified interval of time.
+		/// Begins an asynchronous operation to receive a message that has a specified time out and state object associated with it.
 		/// </summary>
+		/// <param name="timeout">The <see cref="T:System.Timespan"/> that specifies the interval of time to wait for a message to become available.</param>
+		/// <param name="callback">The <see cref="T:System.AsyncCallback"/> delegate that receives the notification of the asynchronous operation completion.</param>
+		/// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous operation.</param>
 		/// <returns>
-		/// The <see cref="T:System.ServiceModel.Channels.Message" /> received. 
+		/// The <see cref="T:System.IAsyncResult"/> that references the asynchronous receive operation.
 		/// </returns>
-		/// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the receive operation has to complete before timing out and throwing a <see cref="T:System.TimeoutException" />.</param>
-		/// <exception cref="T:System.TimeoutException">The specified <paramref name="timeout" /> is exceeded before the operation is completed.</exception>
+		/// <exception cref="T:System.TimeoutException">The specified <paramref name="timeout"/> is exceeded before the operation is completed.</exception>
 		/// <exception cref="T:System.ArgumentOutOfRangeException">The timeout specified is less than zero.</exception>
-		public Message Receive(TimeSpan timeout)
+		public IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)
 		{
-			Message message;
-			if(TryReceive(timeout, out message))
-			{
-				return message;
-			}
-			throw new TimeoutException(String.Format("Receive timed out after {0}.  The time allotted to this operation may have been a portion of a longer timeout.", timeout));
+			return BeginDequeue(timeout, callback, state);
 		}
 
 		/// <summary>
-		/// Tries to receive a message within a specified interval of time. 
+		/// Completes an asynchronous operation to receive a message.
 		/// </summary>
+		/// <param name="result">The <see cref="T:System.IAsyncResult"/> returned by a call to one of the <c>System.ServiceModel.Channels.IInputChannel.BeginReceive</c> methods.</param>
 		/// <returns>
-		/// true if a message is received before the <paramref name="timeout" /> has been exceeded; otherwise false.
+		/// The <see cref="T:System.ServiceModel.Channels.Message"/> received.
 		/// </returns>
-		/// <param name="timeout">The <see cref="T:System.IAsyncResult" /> returned by a call to one of the <see cref="System.ServiceModel.Channels.IInputChannel.BeginReceive(AsyncCallback, object)" /> methods.</param>
-		/// <param name="message">The <see cref="T:System.ServiceModel.Channels.Message" /> received. </param>
-		/// <exception cref="T:System.TimeoutException">The specified <paramref name="timeout" /> is exceeded before the operation is completed.</exception>
-		/// <exception cref="T:System.ArgumentOutOfRangeException">The timeout specified is less than zero.</exception>
-		public bool TryReceive(TimeSpan timeout, out Message message)
+		public Message EndReceive(IAsyncResult result)
 		{
-			NmsChannelHelper.ValidateTimeout(timeout);
-			return _messages.Dequeue(timeout, out message);
+			return EndDequeue(result);
 		}
 
 		/// <summary>
-		/// Begins an asynchronous operation to receive a message that has a state object associated with it. 
+		/// Returns the message received, if one is available. If a message is not available, blocks for a default interval of time.
 		/// </summary>
 		/// <returns>
-		/// The <see cref="T:System.IAsyncResult" /> that references the asynchronous message reception. 
+		/// The <see cref="T:System.ServiceModel.Channels.Message"/> received.
 		/// </returns>
-		/// <param name="callback">The <see cref="T:System.AsyncCallback" /> delegate that receives the notification of the asynchronous operation completion.</param>
-		/// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous operation.</param>
-		public IAsyncResult BeginReceive(AsyncCallback callback, object state)
+		public Message Receive()
 		{
-			return BeginReceive(DefaultReceiveTimeout, callback, state);
+			return Receive(DefaultReceiveTimeout);
 		}
 
 		/// <summary>
-		/// Begins an asynchronous operation to receive a message that has a specified time out and state object associated with it. 
+		/// Returns the message received, if one is available. If a message is not available, blocks for a specified interval of time.
 		/// </summary>
+		/// <param name="timeout">The <see cref="T:System.Timespan"/> that specifies how long the receive operation has to complete before timing out and throwing a <see cref="T:System.TimeoutException"/>.</param>
 		/// <returns>
-		/// The <see cref="T:System.IAsyncResult" /> that references the asynchronous receive operation.
+		/// The <see cref="T:System.ServiceModel.Channels.Message"/> received.
 		/// </returns>
-		/// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies the interval of time to wait for a message to become available.</param>
-		/// <param name="callback">The <see cref="T:System.AsyncCallback" /> delegate that receives the notification of the asynchronous operation completion.</param>
-		/// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous operation.</param>
-		/// <exception cref="T:System.TimeoutException">The specified <paramref name="timeout" /> is exceeded before the operation is completed.</exception>
+		/// <exception cref="T:System.TimeoutException">The specified <paramref name="timeout"/> is exceeded before the operation is completed.</exception>
 		/// <exception cref="T:System.ArgumentOutOfRangeException">The timeout specified is less than zero.</exception>
-		public IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)
+		public Message Receive(TimeSpan timeout)
 		{
-			return BeginTryReceive(timeout, callback, state);
+			return this.Dequeue(timeout);
 		}
 
-		/// <summary>
-		/// Completes an asynchronous operation to receive a message. 
-		/// </summary>
-		/// <returns>
-		/// The <see cref="T:System.ServiceModel.Channels.Message" /> received. 
-		/// </returns>
-		/// <param name="result">The <see cref="T:System.IAsyncResult" /> returned by a call to one of the <see cref="System.ServiceModel.Channels.IInputChannel.BeginReceive(AsyncCallback, object)" /> methods.</param>
-		public Message EndReceive(IAsyncResult result)
-		{
-			return _messages.EndDequeue(result);
-		}
+		#endregion
+
+		#region TryReceive
 
 		/// <summary>
-		/// Begins an asynchronous operation to receive a message that has a specified time out and state object associated with it. 
+		/// Begins an asynchronous operation to receive a message that has a specified time out and state object associated with it.
 		/// </summary>
+		/// <param name="timeout">The <see cref="T:System.Timespan"/> that specifies the interval of time to wait for a message to become available.</param>
+		/// <param name="callback">The <see cref="T:System.AsyncCallback"/> delegate that receives the notification of the asynchronous operation completion.</param>
+		/// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous operation.</param>
 		/// <returns>
-		/// The <see cref="T:System.IAsyncResult" /> that references the asynchronous receive operation.
+		/// The <see cref="T:System.IAsyncResult"/> that references the asynchronous receive operation.
 		/// </returns>
-		/// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies the interval of time to wait for a message to become available.</param>
-		/// <param name="callback">The <see cref="T:System.AsyncCallback" /> delegate that receives the notification of the asynchronous operation completion.</param>
-		/// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous operation.</param>
-		/// <exception cref="T:System.TimeoutException">The specified <paramref name="timeout" /> is exceeded before the operation is completed.</exception>
+		/// <exception cref="T:System.TimeoutException">The specified <paramref name="timeout"/> is exceeded before the operation is completed.</exception>
 		/// <exception cref="T:System.ArgumentOutOfRangeException">The timeout specified is less than zero.</exception>
 		public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state)
 		{
-			NmsChannelHelper.ValidateTimeout(timeout);
-			return _messages.BeginDequeue(timeout, callback, state);
+			return BeginDequeue(timeout, callback, state);
 		}
 
 		/// <summary>
 		/// Completes the specified asynchronous operation to receive a message.
 		/// </summary>
+		/// <param name="result">The <see cref="T:System.IAsyncResult"/> returned by a call to the <see cref="M:System.ServiceModel.Channels.IInputChannel.BeginTryReceive(System.TimeSpan,System.AsyncCallback,System.Object)"/> method.</param>
+		/// <param name="message">The <see cref="T:System.ServiceModel.Channels.Message"/> received.</param>
 		/// <returns>
 		/// true if a message is received before the specified interval of time elapses; otherwise false.
 		/// </returns>
-		/// <param name="result">The <see cref="T:System.IAsyncResult" /> returned by a call to the <see cref="M:System.ServiceModel.Channels.IInputChannel.BeginTryReceive(System.TimeSpan,System.AsyncCallback,System.Object)" /> method.</param>
-		/// <param name="message">The <see cref="T:System.ServiceModel.Channels.Message" /> received. </param>
 		public bool EndTryReceive(IAsyncResult result, out Message message)
 		{
-			return _messages.EndDequeue(result, out message);
+			message = null;
+			return TryDequeue(result, out message);
 		}
 
 		/// <summary>
-		/// Returns a value that indicates whether a message has arrived within a specified interval of time.
+		/// Tries to receive a message within a specified interval of time.
 		/// </summary>
+		/// <param name="timeout">The <see cref="T:System.IAsyncResult"/> returned by a call to one of the <c>System.ServiceModel.Channels.IInputChannel.BeginReceive</c> methods.</param>
+		/// <param name="message">The <see cref="T:System.ServiceModel.Channels.Message"/> received.</param>
 		/// <returns>
-		/// true if a message has arrived before the <paramref name="timeout" /> has been exceeded; otherwise false.
+		/// true if a message is received before the <paramref name="timeout"/> has been exceeded; otherwise false.
 		/// </returns>
-		/// <param name="timeout">The <see cref="T:System.TimeSpan" /> specifies the maximum interval of time to wait for a message to arrive before timing out.</param>
-		/// <exception cref="T:System.TimeoutException">The specified <paramref name="timeout" /> is exceeded before the operation is completed.</exception>
+		/// <exception cref="T:System.TimeoutException">The specified <paramref name="timeout"/> is exceeded before the operation is completed.</exception>
 		/// <exception cref="T:System.ArgumentOutOfRangeException">The timeout specified is less than zero.</exception>
-		public bool WaitForMessage(TimeSpan timeout)
+		public bool TryReceive(TimeSpan timeout, out Message message)
 		{
-			NmsChannelHelper.ValidateTimeout(timeout);
-			return _messages.WaitForItem(timeout);
+			message = Receive(timeout);
+			return true;
 		}
 
+		#endregion
+
+		#region WaitForMessage
+
 		/// <summary>
-		/// Begins an asynchronous wait-for-a-message-to-arrive operation that has a specified time out and state object associated with it. 
+		/// Begins an asynchronous wait-for-a-message-to-arrive operation that has a specified time out and state object associated with it.
 		/// </summary>
+		/// <param name="timeout">The <see cref="T:System.Timespan"/> that specifies the interval of time to wait for a message to become available.</param>
+		/// <param name="callback">The <see cref="T:System.AsyncCallback"/> delegate that receives the notification of the asynchronous operation completion.</param>
+		/// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous operation.</param>
 		/// <returns>
-		/// The <see cref="T:System.IAsyncResult" /> that references the asynchronous operation to wait for a message to arrive.
+		/// The <see cref="T:System.IAsyncResult"/> that references the asynchronous operation to wait for a message to arrive.
 		/// </returns>
-		/// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies the interval of time to wait for a message to become available.</param>
-		/// <param name="callback">The <see cref="T:System.AsyncCallback" /> delegate that receives the notification of the asynchronous operation completion.</param>
-		/// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous operation.</param>
-		/// <exception cref="T:System.TimeoutException">The specified <paramref name="timeout" /> is exceeded before the operation is completed.</exception>
+		/// <exception cref="T:System.TimeoutException">The specified <paramref name="timeout"/> is exceeded before the operation is completed.</exception>
 		/// <exception cref="T:System.ArgumentOutOfRangeException">The timeout specified is less than zero.</exception>
 		public IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state)
 		{
-			NmsChannelHelper.ValidateTimeout(timeout);
-			return _messages.BeginWaitForItem(timeout, callback, state);
+			throw new NotImplementedException();
 		}
 
 		/// <summary>
 		/// Completes the specified asynchronous wait-for-a-message operation.
 		/// </summary>
+		/// <param name="result">The <see cref="T:System.IAsyncResult"/> that identifies the <see cref="M:System.ServiceModel.Channels.IInputChannel.BeginWaitForMessage(System.TimeSpan,System.AsyncCallback,System.Object)"/> operation to finish, and from which to retrieve an end result.</param>
 		/// <returns>
-		/// true if a message has arrived before the timeout has been exceeded; otherwise false.
+		/// true if a message has arrived before the <paramref name="timeout"/> has been exceeded; otherwise false.
 		/// </returns>
-		/// <param name="result">The <see cref="T:System.IAsyncResult" /> that identifies the <see cref="M:System.ServiceModel.Channels.IInputChannel.BeginWaitForMessage(System.TimeSpan,System.AsyncCallback,System.Object)" /> operation to finish, and from which to retrieve an end result.</param>
 		public bool EndWaitForMessage(IAsyncResult result)
 		{
-			return _messages.EndWaitForItem(result);
+			throw new NotImplementedException();
 		}
 
 		/// <summary>
-		/// Gets the address on which the input channel receives messages. 
-		/// </summary>
-		/// <returns>
-		/// The <see cref="T:System.ServiceModel.EndpointAddress" /> on which the input channel receives messages. 
-		/// </returns>
-		public EndpointAddress LocalAddress
-		{
-			get { return _localAddress; }
-		}
-
-		#endregion
-
-		/// <summary>
-		/// Inserts processing on a communication object after it transitions to the closing state due to the invocation of a synchronous abort operation.
-		/// </summary>
-		protected override void OnAbort()
-		{
-			OnClose(TimeSpan.Zero);
-		}
-
-		/// <summary>
-		/// Inserts processing on a communication object after it transitions to the closing state due to the invocation of a synchronous close operation.
-		/// </summary>
-		/// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the on close operation has to complete before timing out.</param>
-		/// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout" /> is less than zero.</exception>
-		protected override void OnClose(TimeSpan timeout)
-		{
-			NmsChannelHelper.ValidateTimeout(timeout);
-			_messages.Close();
-		}
-
-		/// <summary>
-		/// Completes an asynchronous operation on the close of a communication object.
-		/// </summary>
-		/// <param name="result">The <see cref="T:System.IAsyncResult" /> that is returned by a call to the <see cref="M:System.ServiceModel.Channels.CommunicationObject.OnEndClose(System.IAsyncResult)" /> method.</param>
-		protected override void OnEndClose(IAsyncResult result)
-		{
-			CompletedAsyncResult.End(result);
-		}
-
-		/// <summary>
-		/// Inserts processing after a communication object transitions to the closing state due to the invocation of an asynchronous close operation.
-		/// </summary>
-		/// <returns>
-		/// The <see cref="T:System.IAsyncResult" /> that references the asynchronous on close operation. 
-		/// </returns>
-		/// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the on close operation has to complete before timing out.</param>
-		/// <param name="callback">The <see cref="T:System.AsyncCallback" /> delegate that receives notification of the completion of the asynchronous on close operation.</param>
-		/// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous on close operation.</param>
-		/// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout" /> is less than zero.</exception>
-		protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
-		{
-			OnClose(timeout);
-			return new CompletedAsyncResult(callback, state);
-		}
-
-		/// <summary>
-		/// Inserts processing on a communication object after it transitions into the opening state which must complete within a specified interval of time.
-		/// </summary>
-		/// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the on open operation has to complete before timing out.</param>
-		/// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout" /> is less than zero.</exception>
-		/// <exception cref="T:System.TimeoutException">The interval of time specified by <paramref name="timeout" /> that was allotted for the operation was exceeded before the operation was completed.</exception>
-		protected override void OnOpen(TimeSpan timeout)
-		{
-		}
-
-		/// <summary>
-		/// Inserts processing on a communication object after it transitions to the opening state due to the invocation of an asynchronous open operation.
+		/// Returns a value that indicates whether a message has arrived within a specified interval of time.
 		/// </summary>
+		/// <param name="timeout">The <see cref="T:System.Timespan"/> specifies the maximum interval of time to wait for a message to arrive before timing out.</param>
 		/// <returns>
-		/// The <see cref="T:System.IAsyncResult" /> that references the asynchronous on open operation. 
+		/// true if a message has arrived before the <paramref name="timeout"/> has been exceeded; otherwise false.
 		/// </returns>
-		/// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the on open operation has to complete before timing out.</param>
-		/// <param name="callback">The <see cref="T:System.AsyncCallback" /> delegate that receives notification of the completion of the asynchronous on open operation.</param>
-		/// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous on open operation.</param>
-		/// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout" /> is less than zero.</exception>
-		protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
-		{
-			return new CompletedAsyncResult(callback, state);
-		}
-
-		/// <summary>
-		/// Completes an asynchronous operation on the open of a communication object.
-		/// </summary>
-		/// <param name="result">The <see cref="T:System.IAsyncResult" /> that is returned by a call to the <see cref="M:System.ServiceModel.Channels.CommunicationObject.OnEndOpen(System.IAsyncResult)" /> method.</param>
-		/// <exception cref="T:System.TimeoutException">The interval of time specified by the timeout that was allotted for the operation was exceeded before the operation was completed.</exception>
-		protected override void OnEndOpen(IAsyncResult result)
+		/// <exception cref="T:System.TimeoutException">The specified <paramref name="timeout"/> is exceeded before the operation is completed.</exception>
+		/// <exception cref="T:System.ArgumentOutOfRangeException">The timeout specified is less than zero.</exception>
+		public bool WaitForMessage(TimeSpan timeout)
 		{
-			CompletedAsyncResult.End(result);
+			throw new NotImplementedException();
 		}
 
-		#region Private members
-
-		private readonly InputQueue<Message> _messages;
-		private EndpointAddress _localAddress;
-
 		#endregion
 	}
-}
\ No newline at end of file
+}

Added: activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputChannelListener.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputChannelListener.cs?rev=726083&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputChannelListener.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputChannelListener.cs Fri Dec 12 10:25:52 2008
@@ -0,0 +1,553 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.ServiceModel;
+using System.ServiceModel.Channels;
+using System.Text;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.WCF
+{
+	/// <summary>
+	/// Server-side listener for sessionless input channels.
+	/// </summary>
+	public class NmsInputChannelListener : ChannelListenerBase<IInputChannel>
+	{
+		#region Constructors
+
+		/// <summary>
+		/// Initializes a new instance of the <see cref="NmsInputChannelListener"/> class.
+		/// </summary>
+		/// <param name="transportElement">The binding element.</param>
+		/// <param name="context">The context.</param>
+		internal NmsInputChannelListener(NmsTransportBindingElement transportElement, BindingContext context)
+			: base(context.Binding)
+		{
+			_bufferManager = BufferManager.CreateBufferManager(transportElement.MaxBufferPoolSize, (int) transportElement.MaxReceivedMessageSize);
+
+			MessageEncodingBindingElement messageEncoderBindingElement = context.BindingParameters.Remove<MessageEncodingBindingElement>();
+			_messageEncoderFactory = (messageEncoderBindingElement != null)
+				? messageEncoderBindingElement.CreateMessageEncoderFactory()
+				: NmsConstants.DefaultMessageEncoderFactory;
+
+			_channelQueue = new InputQueue<IInputChannel>();
+			_currentChannelLock = new object();
+			_destinationName = transportElement.Destination;
+			_destinationType = transportElement.DestinationType;
+			_uri = new Uri(context.ListenUriBaseAddress, context.ListenUriRelativeAddress);
+			Tracer.DebugFormat("Listening to {0} at {1}/{2}", _destinationType, _uri, _destinationName);
+		}
+
+		#endregion
+
+		#region Public properties
+
+		/// <summary>
+		/// Gets the message encoder factory.
+		/// </summary>
+		/// <value>The message encoder factory.</value>
+		public MessageEncoderFactory MessageEncoderFactory
+		{
+			get { return _messageEncoderFactory; }
+		}
+
+		/// <summary>
+		/// Gets or sets the destination.
+		/// </summary>
+		/// <value>The destination.</value>
+		public string Destination
+		{
+			get { return _destinationName; }
+			set { _destinationName = value; }
+		}
+
+		/// <summary>
+		/// Gets or sets the type of the destination.
+		/// </summary>
+		/// <value>The type of the destination.</value>
+		public DestinationType DestinationType
+		{
+			get { return _destinationType; }
+			set { _destinationType = value; }
+		}
+
+		#endregion
+
+		#region Implementation of CommunicationObject
+
+		/// <summary>
+		/// Inserts processing on a communication object after it transitions to the closing state 
+		/// due to the invocation of a synchronous abort operation.
+		/// </summary>
+		/// <remarks>
+		/// Abort can be called at any time, so we can't assume that we've been Opened successfully 
+		/// (and thus may not have any listen sockets).
+		/// </remarks>
+		protected override void OnAbort()
+		{
+			OnClose(TimeSpan.Zero);
+		}
+
+		/// <summary>
+		/// Inserts processing on a communication object after it transitions to the closing state due to the invocation of a synchronous close operation.
+		/// </summary>
+		/// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the on close operation has to complete before timing out.</param>
+		/// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout" /> is less than zero.</exception>
+		protected override void OnClose(TimeSpan timeout)
+		{
+			lock(ThisLock)
+			{
+				if(_consumer != null)
+				{
+					Tracer.Debug("Listener is terminating consumer...");
+					_consumer.Close();
+					_consumer.Dispose();
+					Tracer.Debug("Listener has terminated consumer");
+				}
+
+				if(_session != null)
+				{
+					Tracer.Debug("Listener is terminating session...");
+					_session.Close();
+					Tracer.Debug("Listener has terminated session");
+				}
+
+				if(_connection != null)
+				{
+					Tracer.Debug("Listener is terminating connection...");
+					_connection.Stop();
+					_connection.Close();
+					_connection.Dispose();
+					Tracer.Debug("Listener has terminated connection");
+				}
+
+				_channelQueue.Close();
+			}
+		}
+
+		/// <summary>
+		/// Inserts processing after a communication object transitions to the closing state due to the invocation of an asynchronous close operation.
+		/// </summary>
+		/// <returns>
+		/// The <see cref="T:System.IAsyncResult" /> that references the asynchronous on close operation. 
+		/// </returns>
+		/// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the on close operation has to complete before timing out.</param>
+		/// <param name="callback">The <see cref="T:System.AsyncCallback" /> delegate that receives notification of the completion of the asynchronous on close operation.</param>
+		/// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous on close operation.</param>
+		/// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout" /> is less than zero.</exception>
+		protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
+		{
+			OnClose(timeout);
+			return new CompletedAsyncResult(callback, state);
+		}
+
+		/// <summary>
+		/// Completes an asynchronous operation on the close of a communication object.
+		/// </summary>
+		/// <param name="result">The <see cref="T:System.IAsyncResult" /> that is returned by a call to the <see cref="M:System.ServiceModel.Channels.CommunicationObject.OnEndClose(System.IAsyncResult)" /> method.</param>
+		protected override void OnEndClose(IAsyncResult result)
+		{
+			CompletedAsyncResult.End(result);
+		}
+
+		/// <summary>
+		/// Inserts processing on a communication object after it transitions into the opening state which must complete within a specified interval of time.
+		/// </summary>
+		/// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the on open operation has to complete before timing out.</param>
+		/// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout" /> is less than zero.</exception>
+		/// <exception cref="T:System.TimeoutException">The interval of time specified by <paramref name="timeout" /> that was allotted for the operation was exceeded before the operation was completed.</exception>
+		protected override void OnOpen(TimeSpan timeout)
+		{
+			if(Uri == null)
+			{
+				throw new InvalidOperationException("Uri must be set before ChannelListener is opened.");
+			}
+			NmsChannelHelper.ValidateTimeout(timeout);
+		}
+
+		/// <summary>
+		/// Inserts processing on a communication object after it transitions to the opening state due to the invocation of an asynchronous open operation.
+		/// </summary>
+		/// <returns>
+		/// The <see cref="T:System.IAsyncResult" /> that references the asynchronous on open operation. 
+		/// </returns>
+		/// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the on open operation has to complete before timing out.</param>
+		/// <param name="callback">The <see cref="T:System.AsyncCallback" /> delegate that receives notification of the completion of the asynchronous on open operation.</param>
+		/// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous on open operation.</param>
+		/// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout" /> is less than zero.</exception>
+		protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
+		{
+			NmsChannelHelper.ValidateTimeout(timeout);
+			OnOpen(timeout);
+			return new CompletedAsyncResult(callback, state);
+		}
+
+		/// <summary>
+		/// Completes an asynchronous operation on the open of a communication object.
+		/// </summary>
+		/// <param name="result">The <see cref="T:System.IAsyncResult" /> that is returned by a call to the <see cref="M:System.ServiceModel.Channels.CommunicationObject.OnEndOpen(System.IAsyncResult)" /> method.</param>
+		protected override void OnEndOpen(IAsyncResult result)
+		{
+			CompletedAsyncResult.End(result);
+		}
+
+		#endregion
+
+		#region Implementation of ChannelListenerBase
+
+		/// <summary>
+		/// When implemented in derived class, gets the URI on which the channel listener listens for an incoming channel.
+		/// </summary>
+		/// <returns>
+		/// The <see cref="T:System.Uri" /> on which the channel listener listens for incoming channels.
+		/// </returns>
+		public override Uri Uri
+		{
+			get { return _uri; }
+		}
+
+		/// <summary>
+		/// When overridden in a derived class, provides a point of extensibility when waiting for a channel to arrive.
+		/// </summary>
+		/// <returns>
+		/// true if the method completed before the interval of time specified by the <paramref name="timeout" /> expired; otherwise false.
+		/// </returns>
+		/// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the on wait for a channel operation has to complete before timing out.</param>
+		protected override bool OnWaitForChannel(TimeSpan timeout)
+		{
+			NmsChannelHelper.ValidateTimeout(timeout);
+			return _channelQueue.WaitForItem(timeout);
+		}
+
+		/// <summary>
+		/// When implemented in a derived class, provides a point of extensibility when starting to wait for a channel to arrive.
+		/// </summary>
+		/// <returns>
+		/// The <see cref="T:System.IAsyncResult" /> that references the asynchronous on begin wait operation. 
+		/// </returns>
+		/// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the on begin wait operation has to complete before timing out.</param>
+		/// <param name="callback">The <see cref="T:System.AsyncCallback" /> delegate that receives the notification of the asynchronous operation on begin wait completion.</param>
+		/// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous on begin wait operation.</param>
+		protected override IAsyncResult OnBeginWaitForChannel(TimeSpan timeout, AsyncCallback callback, object state)
+		{
+			NmsChannelHelper.ValidateTimeout(timeout);
+			return _channelQueue.BeginWaitForItem(timeout, callback, state);
+		}
+
+		/// <summary>
+		/// When implemented in a derived class, provides a point of extensibility when ending the waiting for a channel to arrive.
+		/// </summary>
+		/// <returns>
+		/// true if the method completed before the timeout expired; otherwise false.
+		/// </returns>
+		/// <param name="result">The <see cref="T:System.IAsyncResult" /> returned by a call to the <see cref="M:System.ServiceModel.Channels.ChannelListenerBase.OnBeginWaitForChannel(System.TimeSpan,System.AsyncCallback,System.Object)" /> method.</param>
+		protected override bool OnEndWaitForChannel(IAsyncResult result)
+		{
+			return _channelQueue.EndWaitForItem(result);
+		}
+
+		/// <summary>
+		/// When implemented in a derived class, provides an extensibility point when accepting a channel.
+		/// </summary>
+		/// <returns>
+		/// The <see cref="T:System.ServiceModel.Channels.IChannel" /> accepted.
+		/// </returns>
+		/// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the accept channel operation has to complete before timing out.</param>
+		protected override IInputChannel OnAcceptChannel(TimeSpan timeout)
+		{
+			Tracer.Debug("Accepting channel");
+			NmsChannelHelper.ValidateTimeout(timeout);
+			if(!IsDisposed)
+			{
+				EnsureChannelAvailable();
+			}
+
+			IInputChannel channel;
+			if(_channelQueue.Dequeue(timeout, out channel))
+			{
+				return channel;
+			}
+			throw new TimeoutException(String.Format("Accept on listener at address {0} timed out after {1}.", Uri.AbsoluteUri, timeout));
+		}
+
+		/// <summary>
+		/// When implemented in a derived class, provides an asynchronous extensibility point when beginning to accept a channel.
+		/// </summary>
+		/// <returns>
+		/// The <see cref="T:System.IAsyncResult" /> that references the asynchronous accept channel operation. 
+		/// </returns>
+		/// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the accept channel operation has to complete before timing out.</param>
+		/// <param name="callback">The <see cref="T:System.AsyncCallback" /> delegate that receives the notification of the asynchronous completion of the accept channel operation.</param>
+		/// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous accept channel operation.</param>
+		protected override IAsyncResult OnBeginAcceptChannel(TimeSpan timeout, AsyncCallback callback, object state)
+		{
+			NmsChannelHelper.ValidateTimeout(timeout);
+			if(!IsDisposed)
+			{
+				EnsureChannelAvailable();
+			}
+			return _channelQueue.BeginDequeue(timeout, callback, state);
+		}
+
+		/// <summary>
+		/// When implemented in a derived class, provides an asynchronous extensibility point when completing the acceptance a channel.
+		/// </summary>
+		/// <returns>
+		/// The <see cref="T:System.ServiceModel.Channels.IChannel" /> accepted by the listener.
+		/// </returns>
+		/// <param name="result">The <see cref="T:System.IAsyncResult" /> returned by a call to the <see cref="M:System.ServiceModel.Channels.ChannelListenerBase`1.OnBeginAcceptChannel(System.TimeSpan,System.AsyncCallback,System.Object)" /> method.</param>
+		protected override IInputChannel OnEndAcceptChannel(IAsyncResult result)
+		{
+			IInputChannel channel;
+			if(_channelQueue.EndDequeue(result, out channel))
+			{
+				return channel;
+			}
+			throw new TimeoutException();
+		}
+
+		#endregion
+
+		/// <summary>
+		/// Dispatches the callback.
+		/// </summary>
+		/// <param name="state">The state.</param>
+		internal void DispatchCallback(object state)
+		{
+			Dispatch((Message) state);
+		}
+
+		/// <summary>
+		/// Matches an incoming message to its waiting listener,
+		/// using the FilterTable to dispatch the message to the correct
+		/// listener. If no listener is waiting for the message, it is silently
+		/// discarded.
+		/// </summary>
+		internal void Dispatch(Message message)
+		{
+			if(message == null)
+			{
+				return;
+			}
+
+			try
+			{
+				NmsInputChannel newChannel;
+				bool channelCreated = CreateOrRetrieveChannel(out newChannel);
+
+				Tracer.Debug("Dispatching incoming message");
+				newChannel.Dispatch(message);
+
+				if(channelCreated)
+				{
+					//Hand the channel off to whomever is waiting for AcceptChannel() to complete
+					Tracer.Debug("Handing off channel");
+					_channelQueue.EnqueueAndDispatch(newChannel);
+				}
+			}
+			catch(Exception e)
+			{
+				Tracer.ErrorFormat("Error dispatching Message: {0}", e.ToString());
+			}
+		}
+
+		/// <summary>
+		/// Creates or retrieves the channel.
+		/// </summary>
+		/// <param name="newChannel">The channel.</param>
+		private bool CreateOrRetrieveChannel(out NmsInputChannel newChannel)
+		{
+			bool channelCreated = false;
+
+			if((newChannel = _currentChannel) == null)
+			{
+				lock(_currentChannelLock)
+				{
+					if((newChannel = _currentChannel) == null)
+					{
+						newChannel = CreateNmsChannel(Uri);
+						newChannel.Closed += OnChannelClosed;
+						_currentChannel = newChannel;
+						channelCreated = true;
+					}
+				}
+			}
+
+			return channelCreated;
+		}
+
+		/// <summary>
+		/// Called when the channel is closed.
+		/// </summary>
+		/// <param name="sender">The sender.</param>
+		/// <param name="args">The <see cref="System.EventArgs"/> instance containing the event data.</param>
+		private void OnChannelClosed(object sender, EventArgs args)
+		{
+			NmsInputChannel channel = (NmsInputChannel) sender;
+
+			lock(_currentChannelLock)
+			{
+				if(channel == _currentChannel)
+				{
+					_currentChannel = null;
+				}
+			}
+		}
+
+		/// <summary>
+		/// Creates the <see cref="NmsInputChannel" /> that will wait for inbound messages.
+		/// </summary>
+		/// <param name="uri">The URI for the message queue.</param>
+		private NmsInputChannel CreateNmsChannel(Uri uri)
+		{
+			_connection = OpenConnection(uri);
+			_session = OpenSession(_connection);
+			_destination = SessionUtil.GetDestination(_session, Destination, DestinationType);
+			_consumer = CreateConsumer(_session, _destination);
+
+			EndpointAddress address = new EndpointAddress(uri);
+			return new NmsInputChannel(this, address);
+		}
+
+		/// <summary>
+		/// Opens the connection to the message broker.
+		/// </summary>
+		/// <param name="uri">The URI.</param>
+		/// <returns>An active connection to the ActiveMQ message broker specified by the URI;
+		/// exceptions will be caught by the attached ExceptionListener.</returns>
+		private IConnection OpenConnection(Uri uri)
+		{
+			IConnection connection = ConnectionFactoryManager.GetInstance().CreateConnection(uri);
+			connection.ExceptionListener += OnExceptionThrown;
+			connection.Start();
+			Tracer.Debug("Connection open");
+			return connection;
+		}
+
+		/// <summary>
+		/// Opens a session to communicate with a message queue.
+		/// </summary>
+		/// <param name="connection">The connection to the ActiveMQ message broker.</param>
+		/// <returns>A session.</returns>
+		/// <exception cref="InvalidOperationException">the <paramref name="connection" /> has not yet
+		/// been started.</exception>
+		private ISession OpenSession(IConnection connection)
+		{
+			if(!connection.IsStarted)
+			{
+				throw new InvalidOperationException("The connection has not yet been opened");
+			}
+
+			Tracer.Debug("Opening session...");
+			ISession session = connection.CreateSession();
+			Tracer.Debug("Session open");
+			return session;
+		}
+
+		/// <summary>
+		/// Creates the consumer of messages received on the <paramref name="session"/>.
+		/// </summary>
+		/// <param name="session">The session.</param>
+		/// <param name="destination">The destination.</param>
+		/// <returns>A consumer for any messages received during the session;
+		/// messages will be consumed by the attached Listener.</returns>
+		private IMessageConsumer CreateConsumer(ISession session, IDestination destination)
+		{
+			Tracer.Debug("Creating message listener...");
+			IMessageConsumer consumer = session.CreateConsumer(destination);
+			consumer.Listener += OnReceiveMessage;
+			Tracer.Debug("Created message listener");
+			return consumer;
+		}
+
+		/// <summary>
+		/// Event handler that processes a received message.
+		/// </summary>
+		/// <param name="message">The message.</param>
+		private void OnReceiveMessage(IMessage message)
+		{
+			Tracer.Debug("Decoding message");
+			string soapMsg = ((ITextMessage) message).Text;
+			byte[] buffer = Encoding.ASCII.GetBytes(soapMsg);
+			int dataLength = buffer.Length;
+			byte[] data1 = _bufferManager.TakeBuffer(dataLength);
+			Array.Copy(buffer, data1, dataLength);
+
+			ArraySegment<byte> data = new ArraySegment<byte>(data1, 0, dataLength);
+
+			byte[] msgContents = new byte[data.Count];
+			Array.Copy(data.Array, data.Offset, msgContents, 0, msgContents.Length);
+			Message msg = _messageEncoderFactory.Encoder.ReadMessage(data, _bufferManager);
+
+			Tracer.Debug(msg);
+			Dispatch(msg);
+		}
+
+		/// <summary>
+		/// Called when an exception is thrown by the ActiveMQ listener.
+		/// </summary>
+		/// <remarks>
+		/// <see cref="NMSException" />s will be caught and logged; all other exceptions will
+		/// be thrown back up to the WCF service.
+		/// </remarks>
+		/// <param name="exception">The exception that was thrown.</param>
+		private void OnExceptionThrown(Exception exception)
+		{
+			if(exception is NMSException)
+			{
+				Tracer.ErrorFormat("{0} thrown : {1}\n{2}",
+					exception.GetType().Name,
+					exception.Message,
+					exception.StackTrace);
+				return;
+			}
+
+			// TODO: can we recover from the exception? Do we convert to WCF exceptions?
+			throw exception;
+		}
+
+		/// <summary>
+		/// Guarantees that a channel is attached to this listener.
+		/// </summary>
+		private void EnsureChannelAvailable()
+		{
+			NmsInputChannel newChannel;
+			if(CreateOrRetrieveChannel(out newChannel))
+			{
+				_channelQueue.EnqueueAndDispatch(newChannel);
+			}
+		}
+
+		#region Private members
+
+		private readonly Uri _uri;
+		private IConnection _connection;
+		private ISession _session;
+		private IDestination _destination;
+		private IMessageConsumer _consumer;
+		private readonly InputQueue<IInputChannel> _channelQueue;
+		private NmsInputChannel _currentChannel;
+		private readonly object _currentChannelLock;
+		private readonly MessageEncoderFactory _messageEncoderFactory;
+		private readonly BufferManager _bufferManager;
+		private string _destinationName;
+		private DestinationType _destinationType;
+
+		#endregion
+	}
+}



Mime
View raw message