activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jgo...@apache.org
Subject svn commit: r709503 [3/4] - in /activemq/activemq-dotnet/Apache.NMS.WCF: ./ branches/ tags/ trunk/ trunk/lib/ trunk/lib/net-3.5/ trunk/src/ trunk/src/main/ trunk/src/main/csharp/ trunk/src/main/csharp/Configuration/ trunk/src/main/ndoc/ trunk/src/test/...
Date Fri, 31 Oct 2008 17:43:12 GMT
Added: activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/InputQueue.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/InputQueue.cs?rev=709503&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/InputQueue.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/InputQueue.cs Fri Oct 31 10:43:10 2008
@@ -0,0 +1,1209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.ServiceModel;
+using System.Threading;
+
+namespace Apache.NMS.WCF
+{
+	// ItemDequeuedCallback is called as an item is dequeued from the InputQueue.  The 
+	// InputQueue lock is not held during the callback.  However, the user code is
+	// not notified of the item being available until the callback returns.  If you
+	// are not sure if the callback blocks for a long time, then first call 
+	// IOThreadScheduler.ScheduleCallback to get to a "safe" thread.
+	internal delegate void ItemDequeuedCallback();
+
+	/// <summary>
+	/// Handles asynchronous interactions between producers and consumers. 
+	/// Producers can dispatch available data to the input queue, 
+	/// where it is dispatched to a waiting consumer or stored until a
+	/// consumer becomes available. Consumers can synchronously or asynchronously
+	/// request data from the queue, which is returned when data becomes
+	/// available.
+	/// </summary>
+	/// <typeparam name="T">The concrete type of the consumer objects that are waiting for data.</typeparam>
+	internal class InputQueue<T> : IDisposable where T : class
+	{
+		//Stores items that are waiting to be accessed.
+		ItemQueue itemQueue;
+
+		//Each IQueueReader represents some consumer that is waiting for
+		//items to appear in the queue. The readerQueue stores them
+		//in an ordered list so consumers get serviced in a FIFO manner.
+		Queue<IQueueReader> readerQueue;
+
+		//Each IQueueWaiter represents some waiter that is waiting for
+		//items to appear in the queue.  When any item appears, all
+		//waiters are signaled.
+		List<IQueueWaiter> waiterList;
+
+		static WaitCallback onInvokeDequeuedCallback;
+		static WaitCallback onDispatchCallback;
+		static WaitCallback completeOutstandingReadersCallback;
+		static WaitCallback completeWaitersFalseCallback;
+		static WaitCallback completeWaitersTrueCallback;
+
+		//Represents the current state of the InputQueue.
+		//as it transitions through its lifecycle.
+		QueueState _queueState;
+		enum QueueState
+		{
+			Open,
+			Shutdown,
+			Closed
+		}
+
+		/// <summary>
+		/// Initializes a new instance of the <see cref="InputQueue&lt;T&gt;"/> class.
+		/// </summary>
+		public InputQueue()
+		{
+			itemQueue = new ItemQueue();
+			readerQueue = new Queue<IQueueReader>();
+			waiterList = new List<IQueueWaiter>();
+			_queueState = QueueState.Open;
+		}
+
+		public int PendingCount
+		{
+			get
+			{
+				lock (ThisLock)
+				{
+					return itemQueue.ItemCount;
+				}
+			}
+		}
+
+		object ThisLock
+		{
+			get { return itemQueue; }
+		}
+
+		public IAsyncResult BeginDequeue(TimeSpan timeout, AsyncCallback callback, object state)
+		{
+			Item item = default(Item);
+
+			lock (ThisLock)
+			{
+				if (_queueState == QueueState.Open)
+				{
+					if (itemQueue.HasAvailableItem)
+					{
+						item = itemQueue.DequeueAvailableItem();
+					}
+					else
+					{
+						AsyncQueueReader reader = new AsyncQueueReader(this, timeout, callback, state);
+						readerQueue.Enqueue(reader);
+						return reader;
+					}
+				}
+				else if (_queueState == QueueState.Shutdown)
+				{
+					if (itemQueue.HasAvailableItem)
+					{
+						item = itemQueue.DequeueAvailableItem();
+					}
+					else if (itemQueue.HasAnyItem)
+					{
+						AsyncQueueReader reader = new AsyncQueueReader(this, timeout, callback, state);
+						readerQueue.Enqueue(reader);
+						return reader;
+					}
+				}
+			}
+
+			InvokeDequeuedCallback(item.DequeuedCallback);
+			return new TypedCompletedAsyncResult<T>(item.GetValue(), callback, state);
+		}
+
+		public IAsyncResult BeginWaitForItem(TimeSpan timeout, AsyncCallback callback, object state)
+		{
+			lock (ThisLock)
+			{
+				if (_queueState == QueueState.Open)
+				{
+					if (!itemQueue.HasAvailableItem)
+					{
+						AsyncQueueWaiter waiter = new AsyncQueueWaiter(timeout, callback, state);
+						waiterList.Add(waiter);
+						return waiter;
+					}
+				}
+				else if (_queueState == QueueState.Shutdown)
+				{
+					if (!itemQueue.HasAvailableItem && itemQueue.HasAnyItem)
+					{
+						AsyncQueueWaiter waiter = new AsyncQueueWaiter(timeout, callback, state);
+						waiterList.Add(waiter);
+						return waiter;
+					}
+				}
+			}
+
+			return new TypedCompletedAsyncResult<bool>(true, callback, state);
+		}
+
+		static void CompleteOutstandingReadersCallback(object state)
+		{
+			IQueueReader[] outstandingReaders = (IQueueReader[])state;
+
+			for (int i = 0; i < outstandingReaders.Length; i++)
+			{
+				outstandingReaders[i].Set(default(Item));
+			}
+		}
+
+		static void CompleteWaitersFalseCallback(object state)
+		{
+			CompleteWaiters(false, (IQueueWaiter[])state);
+		}
+
+		static void CompleteWaitersTrueCallback(object state)
+		{
+			CompleteWaiters(true, (IQueueWaiter[])state);
+		}
+
+		static void CompleteWaiters(bool itemAvailable, IQueueWaiter[] waiters)
+		{
+			for (int i = 0; i < waiters.Length; i++)
+			{
+				waiters[i].Set(itemAvailable);
+			}
+		}
+
+		static void CompleteWaitersLater(bool itemAvailable, IQueueWaiter[] waiters)
+		{
+			if (itemAvailable)
+			{
+				if (completeWaitersTrueCallback == null)
+				{
+					completeWaitersTrueCallback = CompleteWaitersTrueCallback;
+				}
+
+				ThreadPool.QueueUserWorkItem(completeWaitersTrueCallback, waiters);
+			}
+			else
+			{
+				if (completeWaitersFalseCallback == null)
+				{
+					completeWaitersFalseCallback = CompleteWaitersFalseCallback;
+				}
+
+				ThreadPool.QueueUserWorkItem(completeWaitersFalseCallback, waiters);
+			}
+		}
+
+		void GetWaiters(out IQueueWaiter[] waiters)
+		{
+			if (waiterList.Count > 0)
+			{
+				waiters = waiterList.ToArray();
+				waiterList.Clear();
+			}
+			else
+			{
+				waiters = null;
+			}
+		}
+
+		public void Close()
+		{
+			((IDisposable)this).Dispose();
+		}
+
+		public void Shutdown()
+		{
+			IQueueReader[] outstandingReaders = null;
+			lock (ThisLock)
+			{
+				if (_queueState == QueueState.Shutdown)
+				{
+					return;
+				}
+
+				if (_queueState == QueueState.Closed)
+				{
+					return;
+				}
+
+				_queueState = QueueState.Shutdown;
+
+				if (readerQueue.Count > 0 && itemQueue.ItemCount == 0)
+				{
+					outstandingReaders = new IQueueReader[readerQueue.Count];
+					readerQueue.CopyTo(outstandingReaders, 0);
+					readerQueue.Clear();
+				}
+			}
+
+			if (outstandingReaders != null)
+			{
+				for (int i = 0; i < outstandingReaders.Length; i++)
+				{
+					outstandingReaders[i].Set(new Item((Exception)null, null));
+				}
+			}
+		}
+
+		public T Dequeue(TimeSpan timeout)
+		{
+			T value;
+
+			if (!Dequeue(timeout, out value))
+			{
+				throw new TimeoutException(string.Format("Dequeue timed out in {0}.", timeout));
+			}
+
+			return value;
+		}
+
+		public bool Dequeue(TimeSpan timeout, out T value)
+		{
+			WaitQueueReader reader = null;
+			Item item = new Item();
+
+			lock (ThisLock)
+			{
+				if (_queueState == QueueState.Open)
+				{
+					if (itemQueue.HasAvailableItem)
+					{
+						item = itemQueue.DequeueAvailableItem();
+					}
+					else
+					{
+						reader = new WaitQueueReader(this);
+						readerQueue.Enqueue(reader);
+					}
+				}
+				else if (_queueState == QueueState.Shutdown)
+				{
+					if (itemQueue.HasAvailableItem)
+					{
+						item = itemQueue.DequeueAvailableItem();
+					}
+					else if (itemQueue.HasAnyItem)
+					{
+						reader = new WaitQueueReader(this);
+						readerQueue.Enqueue(reader);
+					}
+					else
+					{
+						value = default(T);
+						return true;
+					}
+				}
+				else // queueState == QueueState.Closed
+				{
+					value = default(T);
+					return true;
+				}
+			}
+
+			if (reader != null)
+			{
+				return reader.Wait(timeout, out value);
+			}
+
+			InvokeDequeuedCallback(item.DequeuedCallback);
+			value = item.GetValue();
+			return true;
+		}
+
+		public void Dispose()
+		{
+			Dispose(true);
+			GC.SuppressFinalize(this);
+		}
+
+		protected void Dispose(bool disposing)
+		{
+			if (disposing)
+			{
+				bool dispose = false;
+
+				lock (ThisLock)
+				{
+					if (_queueState != QueueState.Closed)
+					{
+						_queueState = QueueState.Closed;
+						dispose = true;
+					}
+				}
+
+				if (dispose)
+				{
+					while (readerQueue.Count > 0)
+					{
+						IQueueReader reader = readerQueue.Dequeue();
+						reader.Set(default(Item));
+					}
+
+					while (itemQueue.HasAnyItem)
+					{
+						Item item = itemQueue.DequeueAnyItem();
+						item.Dispose();
+						InvokeDequeuedCallback(item.DequeuedCallback);
+					}
+				}
+			}
+		}
+
+		public void Dispatch()
+		{
+			IQueueReader reader = null;
+			Item item = new Item();
+			IQueueReader[] outstandingReaders = null;
+			IQueueWaiter[] waiters = null;
+			bool itemAvailable = true;
+
+			lock (ThisLock)
+			{
+				itemAvailable = !((_queueState == QueueState.Closed) || (_queueState == QueueState.Shutdown));
+				GetWaiters(out waiters);
+
+				if (_queueState != QueueState.Closed)
+				{
+					itemQueue.MakePendingItemAvailable();
+
+					if (readerQueue.Count > 0)
+					{
+						item = itemQueue.DequeueAvailableItem();
+						reader = readerQueue.Dequeue();
+
+						if (_queueState == QueueState.Shutdown && readerQueue.Count > 0 && itemQueue.ItemCount == 0)
+						{
+							outstandingReaders = new IQueueReader[readerQueue.Count];
+							readerQueue.CopyTo(outstandingReaders, 0);
+							readerQueue.Clear();
+
+							itemAvailable = false;
+						}
+					}
+				}
+			}
+
+			if (outstandingReaders != null)
+			{
+				if (completeOutstandingReadersCallback == null)
+					completeOutstandingReadersCallback = CompleteOutstandingReadersCallback;
+
+				ThreadPool.QueueUserWorkItem(completeOutstandingReadersCallback, outstandingReaders);
+			}
+
+			if (waiters != null)
+			{
+				CompleteWaitersLater(itemAvailable, waiters);
+			}
+
+			if (reader != null)
+			{
+				InvokeDequeuedCallback(item.DequeuedCallback);
+				reader.Set(item);
+			}
+		}
+
+		//Ends an asynchronous Dequeue operation.
+		public T EndDequeue(IAsyncResult result)
+		{
+			T value;
+
+			if (!EndDequeue(result, out value))
+			{
+				throw new TimeoutException("Asynchronous Dequeue operation timed out.");
+			}
+
+			return value;
+		}
+
+		public bool EndDequeue(IAsyncResult result, out T value)
+		{
+			TypedCompletedAsyncResult<T> typedResult = result as TypedCompletedAsyncResult<T>;
+
+			if (typedResult != null)
+			{
+				value = TypedCompletedAsyncResult<T>.End(result);
+				return true;
+			}
+
+			return AsyncQueueReader.End(result, out value);
+		}
+
+		public bool EndWaitForItem(IAsyncResult result)
+		{
+			TypedCompletedAsyncResult<bool> typedResult = result as TypedCompletedAsyncResult<bool>;
+			if (typedResult != null)
+			{
+				return TypedCompletedAsyncResult<bool>.End(result);
+			}
+
+			return AsyncQueueWaiter.End(result);
+		}
+
+		public void EnqueueAndDispatch(T item)
+		{
+			EnqueueAndDispatch(item, null);
+		}
+
+		public void EnqueueAndDispatch(T item, ItemDequeuedCallback dequeuedCallback)
+		{
+			EnqueueAndDispatch(item, dequeuedCallback, true);
+		}
+
+		public void EnqueueAndDispatch(Exception exception, ItemDequeuedCallback dequeuedCallback, bool canDispatchOnThisThread)
+		{
+			Debug.Assert(exception != null, "exception parameter should not be null");
+			EnqueueAndDispatch(new Item(exception, dequeuedCallback), canDispatchOnThisThread);
+		}
+
+		public void EnqueueAndDispatch(T item, ItemDequeuedCallback dequeuedCallback, bool canDispatchOnThisThread)
+		{
+			Debug.Assert(item != null, "item parameter should not be null");
+			EnqueueAndDispatch(new Item(item, dequeuedCallback), canDispatchOnThisThread);
+		}
+
+		void EnqueueAndDispatch(Item item, bool canDispatchOnThisThread)
+		{
+			bool disposeItem = false;
+			IQueueReader reader = null;
+			bool dispatchLater = false;
+			IQueueWaiter[] waiters = null;
+			bool itemAvailable = true;
+
+			lock (ThisLock)
+			{
+				itemAvailable = !((_queueState == QueueState.Closed) || (_queueState == QueueState.Shutdown));
+				GetWaiters(out waiters);
+
+				if (_queueState == QueueState.Open)
+				{
+					if (canDispatchOnThisThread)
+					{
+						if (readerQueue.Count == 0)
+						{
+							itemQueue.EnqueueAvailableItem(item);
+						}
+						else
+						{
+							reader = readerQueue.Dequeue();
+						}
+					}
+					else
+					{
+						if (readerQueue.Count == 0)
+						{
+							itemQueue.EnqueueAvailableItem(item);
+						}
+						else
+						{
+							itemQueue.EnqueuePendingItem(item);
+							dispatchLater = true;
+						}
+					}
+				}
+				else // queueState == QueueState.Closed || queueState == QueueState.Shutdown
+				{
+					disposeItem = true;
+				}
+			}
+
+			if (waiters != null)
+			{
+				if (canDispatchOnThisThread)
+				{
+					CompleteWaiters(itemAvailable, waiters);
+				}
+				else
+				{
+					CompleteWaitersLater(itemAvailable, waiters);
+				}
+			}
+
+			if (reader != null)
+			{
+				InvokeDequeuedCallback(item.DequeuedCallback);
+				reader.Set(item);
+			}
+
+			if (dispatchLater)
+			{
+				if (onDispatchCallback == null)
+				{
+					onDispatchCallback = OnDispatchCallback;
+				}
+
+				ThreadPool.QueueUserWorkItem(onDispatchCallback, this);
+			}
+			else if (disposeItem)
+			{
+				InvokeDequeuedCallback(item.DequeuedCallback);
+				item.Dispose();
+			}
+		}
+
+		public bool EnqueueWithoutDispatch(T item, ItemDequeuedCallback dequeuedCallback)
+		{
+			Debug.Assert(item != null, "EnqueueWithoutDispatch: item parameter should not be null");
+			return EnqueueWithoutDispatch(new Item(item, dequeuedCallback));
+		}
+
+		public bool EnqueueWithoutDispatch(Exception exception, ItemDequeuedCallback dequeuedCallback)
+		{
+			Debug.Assert(exception != null, "EnqueueWithoutDispatch: exception parameter should not be null");
+			return EnqueueWithoutDispatch(new Item(exception, dequeuedCallback));
+		}
+
+		// This does not block, however, Dispatch() must be called later if this function
+		// returns true.
+		bool EnqueueWithoutDispatch(Item item)
+		{
+			lock (ThisLock)
+			{
+				// Open
+				if (_queueState != QueueState.Closed && _queueState != QueueState.Shutdown)
+				{
+					if (readerQueue.Count == 0)
+					{
+						itemQueue.EnqueueAvailableItem(item);
+						return false;
+					}
+					itemQueue.EnqueuePendingItem(item);
+					return true;
+				}
+			}
+
+			item.Dispose();
+			InvokeDequeuedCallbackLater(item.DequeuedCallback);
+			return false;
+		}
+
+		static void OnDispatchCallback(object state)
+		{
+			((InputQueue<T>)state).Dispatch();
+		}
+
+		static void InvokeDequeuedCallbackLater(ItemDequeuedCallback dequeuedCallback)
+		{
+			if (dequeuedCallback != null)
+			{
+				if (onInvokeDequeuedCallback == null)
+				{
+					onInvokeDequeuedCallback = OnInvokeDequeuedCallback;
+				}
+
+				ThreadPool.QueueUserWorkItem(onInvokeDequeuedCallback, dequeuedCallback);
+			}
+		}
+
+		static void InvokeDequeuedCallback(ItemDequeuedCallback dequeuedCallback)
+		{
+			if (dequeuedCallback != null)
+			{
+				dequeuedCallback();
+			}
+		}
+
+		static void OnInvokeDequeuedCallback(object state)
+		{
+			ItemDequeuedCallback dequeuedCallback = (ItemDequeuedCallback)state;
+			dequeuedCallback();
+		}
+
+		bool RemoveReader(IQueueReader reader)
+		{
+			lock (ThisLock)
+			{
+				if (_queueState == QueueState.Open || _queueState == QueueState.Shutdown)
+				{
+					bool removed = false;
+
+					for (int i = readerQueue.Count; i > 0; i--)
+					{
+						IQueueReader temp = readerQueue.Dequeue();
+						if (ReferenceEquals(temp, reader))
+						{
+							removed = true;
+						}
+						else
+						{
+							readerQueue.Enqueue(temp);
+						}
+					}
+
+					return removed;
+				}
+			}
+
+			return false;
+		}
+
+		public bool WaitForItem(TimeSpan timeout)
+		{
+			WaitQueueWaiter waiter = null;
+			bool itemAvailable = false;
+
+			lock (ThisLock)
+			{
+				if (_queueState == QueueState.Open)
+				{
+					if (itemQueue.HasAvailableItem)
+					{
+						itemAvailable = true;
+					}
+					else
+					{
+						waiter = new WaitQueueWaiter();
+						waiterList.Add(waiter);
+					}
+				}
+				else if (_queueState == QueueState.Shutdown)
+				{
+					if (itemQueue.HasAvailableItem)
+					{
+						itemAvailable = true;
+					}
+					else if (itemQueue.HasAnyItem)
+					{
+						waiter = new WaitQueueWaiter();
+						waiterList.Add(waiter);
+					}
+					else
+					{
+						return false;
+					}
+				}
+				else // queueState == QueueState.Closed
+				{
+					return true;
+				}
+			}
+
+			return waiter != null ? waiter.Wait(timeout) : itemAvailable;
+		}
+
+		interface IQueueReader
+		{
+			void Set(Item item);
+		}
+
+		interface IQueueWaiter
+		{
+			void Set(bool itemAvailable);
+		}
+
+		class WaitQueueReader : IQueueReader
+		{
+			Exception _exception;
+			InputQueue<T> _inputQueue;
+			T _item;
+			ManualResetEvent _waitEvent;
+			object _thisLock = new object();
+
+			public WaitQueueReader(InputQueue<T> inputQueue)
+			{
+				_inputQueue = inputQueue;
+				_waitEvent = new ManualResetEvent(false);
+			}
+
+			object ThisLock
+			{
+				get
+				{
+					return _thisLock;
+				}
+			}
+
+			public void Set(Item item)
+			{
+				lock (ThisLock)
+				{
+					Debug.Assert(_item == null, "InputQueue.WaitQueueReader.Set: (this.item == null)");
+					Debug.Assert(_exception == null, "InputQueue.WaitQueueReader.Set: (this.exception == null)");
+
+					_exception = item.Exception;
+					_item = item.Value;
+					_waitEvent.Set();
+				}
+			}
+
+			public bool Wait(TimeSpan timeout, out T value)
+			{
+				bool isSafeToClose = false;
+				try
+				{
+					if (timeout == TimeSpan.MaxValue)
+					{
+						_waitEvent.WaitOne();
+					}
+					else if (!_waitEvent.WaitOne(timeout, false))
+					{
+						if (_inputQueue.RemoveReader(this))
+						{
+							value = default(T);
+							isSafeToClose = true;
+							return false;
+						}
+						else
+						{
+							_waitEvent.WaitOne();
+						}
+					}
+
+					isSafeToClose = true;
+				}
+				finally
+				{
+					if (isSafeToClose)
+					{
+						_waitEvent.Close();
+					}
+				}
+
+				value = _item;
+				return true;
+			}
+		}
+
+		class AsyncQueueReader : AsyncResult, IQueueReader
+		{
+			static TimerCallback timerCallback = new TimerCallback(AsyncQueueReader.TimerCallback);
+
+			bool _expired;
+			InputQueue<T> _inputQueue;
+			T _item;
+			Timer _timer;
+
+			public AsyncQueueReader(InputQueue<T> inputQueue, TimeSpan timeout, AsyncCallback callback, object state)
+				: base(callback, state)
+			{
+				_inputQueue = inputQueue;
+				if (timeout != TimeSpan.MaxValue)
+				{
+					_timer = new Timer(timerCallback, this, timeout, TimeSpan.FromMilliseconds(-1));
+				}
+			}
+
+			public static bool End(IAsyncResult result, out T value)
+			{
+				AsyncQueueReader readerResult = AsyncResult.End<AsyncQueueReader>(result);
+
+				if (readerResult._expired)
+				{
+					value = default(T);
+					return false;
+				}
+
+				value = readerResult._item;
+				return true;
+			}
+
+			static void TimerCallback(object state)
+			{
+				AsyncQueueReader thisPtr = (AsyncQueueReader)state;
+				if (thisPtr._inputQueue.RemoveReader(thisPtr))
+				{
+					thisPtr._expired = true;
+					thisPtr.Complete(false);
+				}
+			}
+
+			public void Set(Item item)
+			{
+				_item = item.Value;
+				if (_timer != null)
+				{
+					_timer.Change(-1, -1);
+				}
+				Complete(false, item.Exception);
+			}
+		}
+
+		internal struct Item
+		{
+			private T _value;
+			private Exception _exception;
+			ItemDequeuedCallback _dequeuedCallback;
+
+			/// <summary>
+			/// Initializes a new instance of the <see cref="InputQueue&lt;T&gt;.Item"/> class.
+			/// </summary>
+			/// <param name="value">The value.</param>
+			/// <param name="dequeuedCallback">The dequeued callback.</param>
+			public Item(T value, ItemDequeuedCallback dequeuedCallback) : this(value, null, dequeuedCallback)
+			{
+			}
+
+			/// <summary>
+			/// Initializes a new instance of the <see cref="InputQueue&lt;T&gt;.Item"/> class.
+			/// </summary>
+			/// <param name="exception">The exception.</param>
+			/// <param name="dequeuedCallback">The dequeued callback.</param>
+			public Item(Exception exception, ItemDequeuedCallback dequeuedCallback) : this(null, exception, dequeuedCallback)
+			{
+			}
+
+			/// <summary>
+			/// Initializes a new instance of the <see cref="InputQueue&lt;T&gt;.Item"/> class.
+			/// </summary>
+			/// <param name="value">The value.</param>
+			/// <param name="exception">The exception.</param>
+			/// <param name="dequeuedCallback">The dequeued callback.</param>
+			internal Item(T value, Exception exception, ItemDequeuedCallback dequeuedCallback) 
+			{
+				_value = value;
+				_exception = exception;
+				_dequeuedCallback = dequeuedCallback;
+			}
+
+			/// <summary>
+			/// Gets the exception.
+			/// </summary>
+			/// <value>The exception.</value>
+			public Exception Exception
+			{
+				get { return _exception; }
+			}
+
+			/// <summary>
+			/// Gets the value.
+			/// </summary>
+			/// <value>The value.</value>
+			public T Value
+			{
+				get { return _value; }
+			}
+
+			/// <summary>
+			/// Gets the dequeued callback.
+			/// </summary>
+			/// <value>The dequeued callback.</value>
+			public ItemDequeuedCallback DequeuedCallback
+			{
+				get { return _dequeuedCallback; }
+			}
+
+			/// <summary>
+			/// Releases unmanaged and - optionally - managed resources
+			/// </summary>
+			public void Dispose()
+			{
+				if (_value != null)
+				{
+					if (_value is IDisposable)
+					{
+						((IDisposable)_value).Dispose();
+					}
+					else if (_value is ICommunicationObject)
+					{
+						((ICommunicationObject)_value).Abort();
+					}
+				}
+			}
+
+			/// <summary>
+			/// Gets the value.
+			/// </summary>
+			/// <returns></returns>
+			public T GetValue()
+			{
+				if (_exception != null)
+				{
+					throw _exception;
+				}
+
+				return _value;
+			}
+		}
+
+		internal class WaitQueueWaiter : IQueueWaiter
+		{
+			bool _itemAvailable;
+			ManualResetEvent _waitEvent;
+			object _thisLock = new object();
+
+			/// <summary>
+			/// Initializes a new instance of the <see cref="InputQueue&lt;T&gt;.WaitQueueWaiter"/> class.
+			/// </summary>
+			public WaitQueueWaiter()
+			{
+				_waitEvent = new ManualResetEvent(false);
+			}
+
+			/// <summary>
+			/// Gets the this lock.
+			/// </summary>
+			/// <value>The this lock.</value>
+			object ThisLock
+			{
+				get
+				{
+					return _thisLock;
+				}
+			}
+
+			/// <summary>
+			/// Sets the specified item available.
+			/// </summary>
+			/// <param name="itemAvailable">if set to <see langword="true"/> [item available].</param>
+			public void Set(bool itemAvailable)
+			{
+				lock (ThisLock)
+				{
+					_itemAvailable = itemAvailable;
+					_waitEvent.Set();
+				}
+			}
+
+			/// <summary>
+			/// Waits the specified timeout.
+			/// </summary>
+			/// <param name="timeout">The timeout.</param>
+			/// <returns></returns>
+			public bool Wait(TimeSpan timeout)
+			{
+				if (timeout == TimeSpan.MaxValue)
+				{
+					_waitEvent.WaitOne();
+				}
+				else if (!_waitEvent.WaitOne(timeout, false))
+				{
+					return false;
+				}
+
+				return _itemAvailable;
+			}
+		}
+
+		internal class AsyncQueueWaiter : AsyncResult, IQueueWaiter
+		{
+			static TimerCallback timerCallback = new TimerCallback(AsyncQueueWaiter.TimerCallback);
+			Timer _timer;
+			bool _itemAvailable;
+			object _thisLock = new object();
+
+			/// <summary>
+			/// Initializes a new instance of the <see cref="InputQueue&lt;T&gt;.AsyncQueueWaiter"/> class.
+			/// </summary>
+			/// <param name="timeout">The timeout.</param>
+			/// <param name="callback">The callback.</param>
+			/// <param name="state">The state.</param>
+			public AsyncQueueWaiter(TimeSpan timeout, AsyncCallback callback, object state) : base(callback, state)
+			{
+				if (timeout != TimeSpan.MaxValue)
+				{
+					_timer = new Timer(timerCallback, this, timeout, TimeSpan.FromMilliseconds(-1));
+				}
+			}
+
+			/// <summary>
+			/// Gets the this lock.
+			/// </summary>
+			/// <value>The this lock.</value>
+			object ThisLock
+			{
+				get
+				{
+					return _thisLock;
+				}
+			}
+
+			/// <summary>
+			/// Ends the specified result.
+			/// </summary>
+			/// <param name="result">The result.</param>
+			/// <returns></returns>
+			public static bool End(IAsyncResult result)
+			{
+				AsyncQueueWaiter waiterResult = AsyncResult.End<AsyncQueueWaiter>(result);
+				return waiterResult._itemAvailable;
+			}
+
+			/// <summary>
+			/// Callback that is invoked when the timer completes.
+			/// </summary>
+			/// <param name="state">The state.</param>
+			public static void TimerCallback(object state)
+			{
+				AsyncQueueWaiter thisPtr = (AsyncQueueWaiter)state;
+				thisPtr.Complete(false);
+			}
+
+			/// <summary>
+			/// Sets the specified item available.
+			/// </summary>
+			/// <param name="itemAvailable">if set to <see langword="true"/> [item available].</param>
+			public void Set(bool itemAvailable)
+			{
+				bool timely;
+
+				lock (ThisLock)
+				{
+					timely = (_timer == null) || _timer.Change(-1, -1);
+					_itemAvailable = itemAvailable;
+				}
+
+				if (timely)
+				{
+					Complete(false);
+				}
+			}
+		}
+
+		internal class ItemQueue
+		{
+			Item[] _items;
+			int _head;
+			int _pendingCount;
+			int _totalCount;
+
+			/// <summary>
+			/// Initializes a new instance of the <see cref="InputQueue&lt;T&gt;.ItemQueue"/> class.
+			/// </summary>
+			public ItemQueue()
+			{
+				_items = new Item[1];
+			}
+
+			/// <summary>
+			/// Dequeues the available item.
+			/// </summary>
+			/// <returns></returns>
+			public Item DequeueAvailableItem()
+			{
+				if (_totalCount == _pendingCount)
+				{
+					throw new Exception("Internal Error - ItemQueue does not contain any available items");
+				}
+				return DequeueItemCore();
+			}
+
+			/// <summary>
+			/// Dequeues any item.
+			/// </summary>
+			/// <returns></returns>
+			public Item DequeueAnyItem()
+			{
+				if (_pendingCount == _totalCount)
+				{
+					_pendingCount--;
+				}
+				return DequeueItemCore();
+			}
+
+			/// <summary>
+			/// Enqueues the item core.
+			/// </summary>
+			/// <param name="item">The item.</param>
+			void EnqueueItemCore(Item item)
+			{
+				if (_totalCount == _items.Length)
+				{
+					Item[] newItems = new Item[_items.Length * 2];
+					for (int i = 0; i < _totalCount; i++)
+					{
+						newItems[i] = _items[(_head + i) % _items.Length];
+					}
+					_head = 0;
+					_items = newItems;
+				}
+				int tail = (_head + _totalCount) % _items.Length;
+				_items[tail] = item;
+				_totalCount++;
+			}
+
+			/// <summary>
+			/// Dequeues the item core.
+			/// </summary>
+			/// <returns></returns>
+			Item DequeueItemCore()
+			{
+				if (_totalCount == 0)
+				{
+					throw new Exception("Internal Error - ItemQueue does not contain any items");
+				}
+				Item item = _items[_head];
+				_items[_head] = new Item();
+				_totalCount--;
+				_head = (_head + 1) % _items.Length;
+				return item;
+			}
+
+			/// <summary>
+			/// Enqueues the pending item.
+			/// </summary>
+			/// <param name="item">The item.</param>
+			public void EnqueuePendingItem(Item item)
+			{
+				EnqueueItemCore(item);
+				_pendingCount++;
+			}
+
+			/// <summary>
+			/// Enqueues the available item.
+			/// </summary>
+			/// <param name="item">The item.</param>
+			public void EnqueueAvailableItem(Item item)
+			{
+				EnqueueItemCore(item);
+			}
+
+			/// <summary>
+			/// Makes the pending item available.
+			/// </summary>
+			public void MakePendingItemAvailable()
+			{
+				if (_pendingCount == 0)
+				{
+					throw new Exception("Internal Error - ItemQueue does not contain any pending items");
+				}
+				_pendingCount--;
+			}
+
+			/// <summary>
+			/// Gets a value indicating whether this instance has available items.
+			/// </summary>
+			/// <value>
+			/// 	<see langword="true"/> if this instance has available item; otherwise, <see langword="false"/>.
+			/// </value>
+			public bool HasAvailableItem
+			{
+				get { return _totalCount > _pendingCount; }
+			}
+
+			/// <summary>
+			/// Gets a value indicating whether this instance has any item.
+			/// </summary>
+			/// <value>
+			/// 	<see langword="true"/> if this instance has any item; otherwise, <see langword="false"/>.
+			/// </value>
+			public bool HasAnyItem
+			{
+				get { return _totalCount > 0; }
+			}
+
+			public int ItemCount
+			{
+				get { return _totalCount; }
+			}
+		}
+	}
+}
\ No newline at end of file

Added: activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsAsyncResult.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsAsyncResult.cs?rev=709503&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsAsyncResult.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsAsyncResult.cs Fri Oct 31 10:43:10 2008
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.ServiceModel.Channels;
+
+namespace Apache.NMS.WCF
+{
+	internal class NmsAsyncResult : AsyncResult
+	{
+		private ArraySegment<byte> _messageBuffer;
+		private readonly NmsOutputChannel _channel;
+
+		/// <summary>
+		/// Initializes a new instance of the <see cref="NmsAsyncResult"/> class.
+		/// </summary>
+		/// <param name="channel">The channel.</param>
+		/// <param name="message">The message.</param>
+		/// <param name="callback">The callback.</param>
+		/// <param name="state">The state.</param>
+		public NmsAsyncResult(NmsOutputChannel channel, Message message, AsyncCallback callback, object state) : base(callback, state)
+		{
+			_channel = channel;
+			_messageBuffer = _channel.EncodeMessage(message);
+
+			try
+			{
+				IAsyncResult result = _channel.BeginSend(message, new AsyncCallback(OnSend), this);
+				if (!result.CompletedSynchronously)
+				{
+					return;
+				}
+
+				CompleteSend(result, true);
+			}
+			catch (Exception)
+			{
+				CleanupBuffer();
+				throw;
+			}
+		}
+
+		private void CleanupBuffer()
+		{
+			if (_messageBuffer.Array != null)
+			{
+				_channel.BufferManager.ReturnBuffer(_messageBuffer.Array);
+				_messageBuffer = new ArraySegment<byte>();
+			}
+		}
+
+		internal void CompleteSend(IAsyncResult result, bool synchronous)
+		{
+			_channel.EndSend(result);
+			CleanupBuffer();
+
+			Complete(synchronous);
+		}
+
+
+		internal void OnSend(IAsyncResult result)
+		{
+			if (result.CompletedSynchronously)
+			{
+				return;
+			}
+
+			try
+			{
+				CompleteSend(result, false);
+			}
+			catch (Exception e)
+			{
+				Complete(false, e);
+			}
+		}
+
+		public static void End(IAsyncResult result)
+		{
+			End<NmsAsyncResult>(result);
+		}
+	}
+}
\ No newline at end of file

Added: activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsChannelBase.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsChannelBase.cs?rev=709503&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsChannelBase.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsChannelBase.cs Fri Oct 31 10:43:10 2008
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.ServiceModel.Channels;
+using System.Text;
+using System.ServiceModel;
+
+namespace Apache.NMS.WCF
+{
+	/// <summary>
+	/// Base class for communication channels.
+	/// </summary>
+	public abstract class NmsChannelBase : ChannelBase
+	{
+		#region Constructors
+
+		/// <summary>
+		/// Initializes a new instance of the <see cref="NmsChannelBase"/> class.
+		/// </summary>
+		/// <param name="bufferManager">The buffer manager.</param>
+		/// <param name="encoderFactory">The encoder factory.</param>
+		/// <param name="address">The address.</param>
+		/// <param name="parent">The parent.</param>
+		/// <param name="destination">The destination.</param>
+		/// <param name="destinationType">Type of the destionation.</param>
+		public NmsChannelBase(BufferManager bufferManager, MessageEncoderFactory encoderFactory, EndpointAddress address, ChannelManagerBase parent, string destination, DestinationType destinationType) : base(parent)
+		{
+			_bufferManager = bufferManager;
+			_encoder = encoderFactory.CreateSessionEncoder();
+			_address = address;
+			_destination = destination;
+			_destinationType = destinationType;
+		}
+
+		#endregion
+
+		#region Public properties
+
+		/// <summary>
+		/// Gets the remote address.
+		/// </summary>
+		/// <value>The remote address.</value>
+		public EndpointAddress RemoteAddress
+		{
+			get { return _address; }
+		}
+
+		/// <summary>
+		/// Gets or sets the buffer manager.
+		/// </summary>
+		/// <value>The buffer manager.</value>
+		public BufferManager BufferManager
+		{
+			get { return _bufferManager; }
+		}
+
+		/// <summary>
+		/// Gets or sets the encoder.
+		/// </summary>
+		/// <value>The encoder.</value>
+		public MessageEncoder Encoder
+		{
+			get { return _encoder; }
+		}
+
+		/// <summary>
+		/// Gets the destination.
+		/// </summary>
+		/// <value>The destination.</value>
+		public string Destination
+		{
+			get { return _destination; }
+		}
+
+		/// <summary>
+		/// Gets the type of the destination.
+		/// </summary>
+		/// <value>The type of the destination.</value>
+		public DestinationType DestinationType
+		{
+			get { return _destinationType; }
+		}
+
+		#endregion
+
+		#region Private members
+
+		private readonly EndpointAddress _address;
+		private readonly BufferManager _bufferManager;
+		private readonly MessageEncoder _encoder;
+		private readonly string _destination;
+		private readonly DestinationType _destinationType;
+
+		#endregion
+	}
+}
\ No newline at end of file

Added: activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsChannelFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsChannelFactory.cs?rev=709503&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsChannelFactory.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsChannelFactory.cs Fri Oct 31 10:43:10 2008
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.ObjectModel;
+using System.ServiceModel;
+using System.ServiceModel.Channels;
+
+namespace Apache.NMS.WCF
+{
+	/// <summary>
+	/// Factory for message channels.
+	/// </summary>
+	public class NmsChannelFactory : ChannelFactoryBase<IOutputChannel>
+	{
+		#region Constructors
+
+		/// <summary>
+		/// Initializes a new instance of the <see cref="NmsChannelFactory"/> class.
+		/// </summary>
+		/// <param name="context">The context.</param>
+		/// <param name="transportElement">The binding element.</param>
+		internal NmsChannelFactory(NmsTransportBindingElement transportElement, BindingContext context) : base(context.Binding)
+		{
+			Collection<MessageEncodingBindingElement> messageEncoderBindingElements = context.BindingParameters.FindAll<MessageEncodingBindingElement>();
+			if (messageEncoderBindingElements.Count > 1)
+			{
+				throw new InvalidOperationException("More than one MessageEncodingBindingElement was found in the BindingParameters of the BindingContext");
+			}
+			_encoderFactory = (messageEncoderBindingElements.Count == 0)
+				? NmsConstants.DefaultMessageEncoderFactory
+				: messageEncoderBindingElements[0].CreateMessageEncoderFactory();
+
+			_bufferManager = BufferManager.CreateBufferManager(transportElement.MaxBufferPoolSize, Int32.MaxValue);
+			_destination = transportElement.Destination;
+			_destinationType = transportElement.DestinationType;
+
+			Console.WriteLine("Destination ({0}) : {1}", _destinationType, _destination);
+		}
+
+		#endregion
+
+		#region Implementation of ChannelFactoryBase
+
+		/// <summary>
+		/// Inserts processing on a communication object after it transitions into the opening state which must complete within a specified interval of time.
+		/// </summary>
+		/// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the on open operation has to complete before timing out.</param>
+		/// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout" /> is less than zero.</exception>
+		/// <exception cref="T:System.TimeoutException">The interval of time specified by <paramref name="timeout" /> that was allotted for the operation was exceeded before the operation was completed.</exception>
+		protected override void OnOpen(TimeSpan timeout)
+		{
+			NmsChannelHelper.ValidateTimeout(timeout);
+		}
+
+		/// <summary>
+		/// Inserts processing on a communication object after it transitions to the opening state due to the invocation of an asynchronous open operation.
+		/// </summary>
+		/// <returns>
+		/// The <see cref="T:System.IAsyncResult" /> that references the asynchronous on open operation. 
+		/// </returns>
+		/// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the on open operation has to complete before timing out.</param>
+		/// <param name="callback">The <see cref="T:System.AsyncCallback" /> delegate that receives notification of the completion of the asynchronous on open operation.</param>
+		/// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous on open operation.</param>
+		/// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout" /> is less than zero.</exception>
+		protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
+		{
+			return new CompletedAsyncResult(callback, state);
+		}
+
+		/// <summary>
+		/// Completes an asynchronous operation on the open of a communication object.
+		/// </summary>
+		/// <param name="result">The <see cref="T:System.IAsyncResult" /> that is returned by a call to the <see cref="M:System.ServiceModel.Channels.CommunicationObject.OnEndOpen(System.IAsyncResult)" /> method.</param>
+		protected override void OnEndOpen(IAsyncResult result)
+		{
+			CompletedAsyncResult.End(result);
+		}
+
+		/// <summary>
+		/// When implemented in a derived class, provides an extensibility point when creating channels.
+		/// </summary>
+		/// <returns>
+		/// An NMS channel with the specified addresses.
+		/// </returns>
+		/// <param name="address">The <see cref="T:System.ServiceModel.EndpointAddress" /> of the remote endpoint to which the channel sends messages.</param>
+		/// <param name="via">The <see cref="T:System.Uri" /> that contains the transport address to which messages are sent on the output channel.</param>
+		protected override IOutputChannel OnCreateChannel(EndpointAddress address, Uri via)
+		{
+			return new NmsOutputChannel(BufferManager, MessageEncoderFactory, address, this, via);
+		}
+
+		#endregion
+
+		/// <summary>
+		/// Invoked during the transition of a communication object into the closing state.
+		/// </summary>
+		protected override void OnClosed()
+		{
+			base.OnClosed();
+			_bufferManager.Clear();
+		}
+
+		/// <summary>
+		/// Gets the buffer manager.
+		/// </summary>
+		public BufferManager BufferManager
+		{
+			get { return _bufferManager; }
+		}
+
+		/// <summary>
+		/// Gets the message encoder factory.
+		/// </summary>
+		public MessageEncoderFactory MessageEncoderFactory
+		{
+			get { return _encoderFactory; }
+		}
+
+		/// <summary>
+		/// Gets the destination.
+		/// </summary>
+		/// <value>The destination.</value>
+		public string Destination
+		{
+			get { return _destination; }
+		}
+
+		/// <summary>
+		/// Gets the type of the destination.
+		/// </summary>
+		/// <value>The type of the destination.</value>
+		public DestinationType DestinationType
+		{
+			get { return _destinationType; }
+		}
+
+		#region Private members
+
+		private readonly BufferManager _bufferManager;
+		private readonly MessageEncoderFactory _encoderFactory;
+		private readonly string _destination;
+		private readonly DestinationType _destinationType;
+
+		#endregion
+	}
+}
\ No newline at end of file

Added: activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsChannelHelper.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsChannelHelper.cs?rev=709503&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsChannelHelper.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsChannelHelper.cs Fri Oct 31 10:43:10 2008
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.WCF
+{
+	/// <summary>
+	/// A helper class for the NMS transport.
+	/// </summary>
+	internal class NmsChannelHelper
+	{
+		/// <summary>
+		/// Ensures that the specified timeout value is not negative.
+		/// </summary>
+		/// <param name="timeout">The timeout that needs to be validated.</param>
+		/// <exception cref="ArgumentOutOfRangeException">the timeout value was negative.</exception>
+		public static void ValidateTimeout(TimeSpan timeout)
+		{
+			if (timeout < TimeSpan.Zero)
+			{
+				throw new ArgumentOutOfRangeException("timeout", timeout, "Timeout must be greater than or equal to TimeSpan.Zero. To disable timeout, specify TimeSpan.MaxValue.");
+			}
+		}
+
+		/// <summary>
+		/// Gets the name of the queue from the URI.
+		/// </summary>
+		/// <param name="uri">The URI of the message queue.</param>
+		public static string GetQueueName(Uri uri)
+		{
+			return uri.LocalPath.TrimStart('/');
+		}
+
+		/// <summary>
+		/// Gets the destination.
+		/// </summary>
+		/// <param name="session">The session.</param>
+		/// <param name="destination">The destination.</param>
+		/// <param name="destinationType">Type of the destination.</param>
+		public static IDestination GetDestination(NMS.ISession session, string destination, DestinationType destinationType)
+		{
+			switch (destinationType)
+			{
+				case DestinationType.Topic:
+					return session.GetTopic(destination);
+				case DestinationType.TemporaryQueue:
+					return session.CreateTemporaryQueue();
+				case DestinationType.TemporaryTopic:
+					return session.CreateTemporaryTopic();
+				default:
+					return session.GetQueue(destination);
+			}
+		}
+	}
+}
\ No newline at end of file

Added: activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsChannelListener.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsChannelListener.cs?rev=709503&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsChannelListener.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsChannelListener.cs Fri Oct 31 10:43:10 2008
@@ -0,0 +1,560 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.ServiceModel;
+using System.ServiceModel.Channels;
+using System.Text;
+using Apache.NMS;
+using ISession = Apache.NMS.ISession;
+
+namespace Apache.NMS.WCF
+{
+	/// <summary>
+	/// Channel listener for messages.
+	/// </summary>
+	public class NmsChannelListener : ChannelListenerBase<IInputChannel>
+	{
+		#region Constructors
+
+		/// <summary>
+		/// Initializes a new instance of the <see cref="NmsChannelListener"/> class.
+		/// </summary>
+		/// <param name="transportElement">The binding element.</param>
+		/// <param name="context">The context.</param>
+		internal NmsChannelListener(NmsTransportBindingElement transportElement, BindingContext context) : base(context.Binding)
+		{
+			_bufferManager = BufferManager.CreateBufferManager(transportElement.MaxBufferPoolSize, (int)transportElement.MaxReceivedMessageSize);
+
+			MessageEncodingBindingElement messageEncoderBindingElement = context.BindingParameters.Remove<MessageEncodingBindingElement>();
+			_messageEncoderFactory = (messageEncoderBindingElement != null)
+				? messageEncoderBindingElement.CreateMessageEncoderFactory() 
+				: NmsConstants.DefaultMessageEncoderFactory;
+			
+			_channelQueue = new InputQueue<IInputChannel>();
+			_currentChannelLock = new object();
+			_destinationName = transportElement.Destination;
+			_destinationType = transportElement.DestinationType;
+			_uri = new Uri(context.ListenUriBaseAddress, context.ListenUriRelativeAddress);
+			Console.WriteLine("Listening to {0} at {1}/{2}", _destinationType, _uri, _destinationName);
+		}
+
+		#endregion
+
+		/// <summary>
+		/// Gets the message encoder factory.
+		/// </summary>
+		/// <value>The message encoder factory.</value>
+		public MessageEncoderFactory MessageEncoderFactory
+		{
+			get	{ return _messageEncoderFactory; }
+		}
+
+		/// <summary>
+		/// Gets or sets the destination.
+		/// </summary>
+		/// <value>The destination.</value>
+		public string Destination
+		{
+			get { return _destinationName; }
+			set { _destinationName = value; }
+		}
+
+		/// <summary>
+		/// Gets or sets the type of the destination.
+		/// </summary>
+		/// <value>The type of the destination.</value>
+		public DestinationType DestinationType
+		{
+			get { return _destinationType; }
+			set { _destinationType = value; }
+		}
+
+		#region Implementation of CommunicationObject
+
+		/// <summary>
+		/// Inserts processing on a communication object after it transitions to the closing state 
+		/// due to the invocation of a synchronous abort operation.
+		/// </summary>
+		/// <remarks>
+		/// Abort can be called at any time, so we can't assume that we've been Opened successfully 
+		/// (and thus may not have any listen sockets).
+		/// </remarks>
+		protected override void OnAbort()
+		{
+			OnClose(TimeSpan.Zero);
+		}
+
+		/// <summary>
+		/// Inserts processing on a communication object after it transitions to the closing state due to the invocation of a synchronous close operation.
+		/// </summary>
+		/// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the on close operation has to complete before timing out.</param>
+		/// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout" /> is less than zero.</exception>
+		protected override void OnClose(TimeSpan timeout)
+		{
+			lock (ThisLock)
+			{
+				if (_consumer != null)
+				{
+					Console.WriteLine("Listener is terminating consumer...");
+					_consumer.Close();
+					_consumer.Dispose();
+					Console.WriteLine("Listener has terminated consumer");
+				}
+
+				if (_session != null)
+				{
+					Console.WriteLine("Listener is terminating session...");
+					_session.Close();
+					Console.WriteLine("Listener has terminated session");
+				}
+
+				if (_connection != null)
+				{
+					Console.WriteLine("Listener is terminating connection...");
+					_connection.Stop();
+					_connection.Close();
+					_connection.Dispose();
+					Console.WriteLine("Listener has terminated connection");
+				}
+
+				_channelQueue.Close();
+			}
+		}
+
+		/// <summary>
+		/// Inserts processing after a communication object transitions to the closing state due to the invocation of an asynchronous close operation.
+		/// </summary>
+		/// <returns>
+		/// The <see cref="T:System.IAsyncResult" /> that references the asynchronous on close operation. 
+		/// </returns>
+		/// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the on close operation has to complete before timing out.</param>
+		/// <param name="callback">The <see cref="T:System.AsyncCallback" /> delegate that receives notification of the completion of the asynchronous on close operation.</param>
+		/// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous on close operation.</param>
+		/// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout" /> is less than zero.</exception>
+		protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
+		{
+			OnClose(timeout);
+			return new CompletedAsyncResult(callback, state);
+		}
+
+		/// <summary>
+		/// Completes an asynchronous operation on the close of a communication object.
+		/// </summary>
+		/// <param name="result">The <see cref="T:System.IAsyncResult" /> that is returned by a call to the <see cref="M:System.ServiceModel.Channels.CommunicationObject.OnEndClose(System.IAsyncResult)" /> method.</param>
+		protected override void OnEndClose(IAsyncResult result)
+		{
+			CompletedAsyncResult.End(result);
+		}
+
+		/// <summary>
+		/// Inserts processing on a communication object after it transitions into the opening state which must complete within a specified interval of time.
+		/// </summary>
+		/// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the on open operation has to complete before timing out.</param>
+		/// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout" /> is less than zero.</exception>
+		/// <exception cref="T:System.TimeoutException">The interval of time specified by <paramref name="timeout" /> that was allotted for the operation was exceeded before the operation was completed.</exception>
+		protected override void OnOpen(TimeSpan timeout)
+		{
+			if (Uri == null)
+			{
+				throw new InvalidOperationException("Uri must be set before ChannelListener is opened.");
+			}
+			NmsChannelHelper.ValidateTimeout(timeout);
+		}
+
+		/// <summary>
+		/// Inserts processing on a communication object after it transitions to the opening state due to the invocation of an asynchronous open operation.
+		/// </summary>
+		/// <returns>
+		/// The <see cref="T:System.IAsyncResult" /> that references the asynchronous on open operation. 
+		/// </returns>
+		/// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the on open operation has to complete before timing out.</param>
+		/// <param name="callback">The <see cref="T:System.AsyncCallback" /> delegate that receives notification of the completion of the asynchronous on open operation.</param>
+		/// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous on open operation.</param>
+		/// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout" /> is less than zero.</exception>
+		protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
+		{
+			NmsChannelHelper.ValidateTimeout(timeout);
+			OnOpen(timeout);
+			return new CompletedAsyncResult(callback, state);
+		}
+
+		/// <summary>
+		/// Completes an asynchronous operation on the open of a communication object.
+		/// </summary>
+		/// <param name="result">The <see cref="T:System.IAsyncResult" /> that is returned by a call to the <see cref="M:System.ServiceModel.Channels.CommunicationObject.OnEndOpen(System.IAsyncResult)" /> method.</param>
+		protected override void OnEndOpen(IAsyncResult result)
+		{
+			CompletedAsyncResult.End(result);
+		}
+
+		#endregion
+
+		#region Implementation of ChannelListenerBase
+
+		/// <summary>
+		/// When implemented in derived class, gets the URI on which the channel listener listens for an incoming channel.
+		/// </summary>
+		/// <returns>
+		/// The <see cref="T:System.Uri" /> on which the channel listener listens for incoming channels.
+		/// </returns>
+		public override Uri Uri
+		{
+			get { return _uri; }
+		}
+
+		/// <summary>
+		/// When overridden in a derived class, provides a point of extensibility when waiting for a channel to arrive.
+		/// </summary>
+		/// <returns>
+		/// true if the method completed before the interval of time specified by the <paramref name="timeout" /> expired; otherwise false.
+		/// </returns>
+		/// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the on wait for a channel operation has to complete before timing out.</param>
+		protected override bool OnWaitForChannel(TimeSpan timeout)
+		{
+			NmsChannelHelper.ValidateTimeout(timeout);
+			return _channelQueue.WaitForItem(timeout);
+		}
+
+		/// <summary>
+		/// When implemented in a derived class, provides a point of extensibility when starting to wait for a channel to arrive.
+		/// </summary>
+		/// <returns>
+		/// The <see cref="T:System.IAsyncResult" /> that references the asynchronous on begin wait operation. 
+		/// </returns>
+		/// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the on begin wait operation has to complete before timing out.</param>
+		/// <param name="callback">The <see cref="T:System.AsyncCallback" /> delegate that receives the notification of the asynchronous operation on begin wait completion.</param>
+		/// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous on begin wait operation.</param>
+		protected override IAsyncResult OnBeginWaitForChannel(TimeSpan timeout, AsyncCallback callback, object state)
+		{
+			NmsChannelHelper.ValidateTimeout(timeout);
+			return _channelQueue.BeginWaitForItem(timeout, callback, state);
+		}
+
+		/// <summary>
+		/// When implemented in a derived class, provides a point of extensibility when ending the waiting for a channel to arrive.
+		/// </summary>
+		/// <returns>
+		/// true if the method completed before the timeout expired; otherwise false.
+		/// </returns>
+		/// <param name="result">The <see cref="T:System.IAsyncResult" /> returned by a call to the <see cref="M:System.ServiceModel.Channels.ChannelListenerBase.OnBeginWaitForChannel(System.TimeSpan,System.AsyncCallback,System.Object)" /> method.</param>
+		protected override bool OnEndWaitForChannel(IAsyncResult result)
+		{
+			return _channelQueue.EndWaitForItem(result);
+		}
+
+		/// <summary>
+		/// When implemented in a derived class, provides an extensibility point when accepting a channel.
+		/// </summary>
+		/// <returns>
+		/// The <see cref="T:System.ServiceModel.Channels.IChannel" /> accepted.
+		/// </returns>
+		/// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the accept channel operation has to complete before timing out.</param>
+		protected override IInputChannel OnAcceptChannel(TimeSpan timeout)
+		{
+			Console.WriteLine("Accepting channel");
+			NmsChannelHelper.ValidateTimeout(timeout);
+			if (!IsDisposed)
+			{
+				EnsureChannelAvailable();
+			}
+
+			IInputChannel channel;
+			if (_channelQueue.Dequeue(timeout, out channel))
+			{
+				return channel;
+			}
+			throw new TimeoutException(String.Format("Accept on listener at address {0} timed out after {1}.", Uri.AbsoluteUri, timeout));
+		}
+
+		/// <summary>
+		/// When implemented in a derived class, provides an asynchronous extensibility point when beginning to accept a channel.
+		/// </summary>
+		/// <returns>
+		/// The <see cref="T:System.IAsyncResult" /> that references the asynchronous accept channel operation. 
+		/// </returns>
+		/// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the accept channel operation has to complete before timing out.</param>
+		/// <param name="callback">The <see cref="T:System.AsyncCallback" /> delegate that receives the notification of the asynchronous completion of the accept channel operation.</param>
+		/// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous accept channel operation.</param>
+		protected override IAsyncResult OnBeginAcceptChannel(TimeSpan timeout, AsyncCallback callback, object state)
+		{
+			NmsChannelHelper.ValidateTimeout(timeout);
+			if (!IsDisposed)
+			{
+				EnsureChannelAvailable();
+			}
+			return _channelQueue.BeginDequeue(timeout, callback, state);
+		}
+
+		/// <summary>
+		/// When implemented in a derived class, provides an asynchronous extensibility point when completing the acceptance a channel.
+		/// </summary>
+		/// <returns>
+		/// The <see cref="T:System.ServiceModel.Channels.IChannel" /> accepted by the listener.
+		/// </returns>
+		/// <param name="result">The <see cref="T:System.IAsyncResult" /> returned by a call to the <see cref="M:System.ServiceModel.Channels.ChannelListenerBase`1.OnBeginAcceptChannel(System.TimeSpan,System.AsyncCallback,System.Object)" /> method.</param>
+		protected override IInputChannel OnEndAcceptChannel(IAsyncResult result)
+		{
+			IInputChannel channel;
+			if (_channelQueue.EndDequeue(result, out channel))
+			{
+				return channel;
+			}
+			throw new TimeoutException();
+		}
+
+		#endregion
+
+		/// <summary>
+		/// Dispatches the callback.
+		/// </summary>
+		/// <param name="state">The state.</param>
+		internal void DispatchCallback(object state)
+		{
+			Dispatch((Message)state);
+		}
+
+		/// <summary>
+		/// Matches an incoming message to its waiting listener,
+		/// using the FilterTable to dispatch the message to the correc
+		/// listener. If no listener is waiting for the message, it is silently
+		/// discarded.
+		/// </summary>
+		internal void Dispatch(Message message)
+		{
+			if (message == null)
+			{
+				return;
+			}
+
+			Console.WriteLine("Dispatching incoming message");
+			try
+			{
+				NmsInputChannel newChannel;
+				bool channelCreated = CreateOrRetrieveChannel(out newChannel);
+
+				newChannel.Dispatch(message);
+
+				if (channelCreated)
+				{
+					//Hand the channel off to whomever is waiting for AcceptChannel() to complete
+					_channelQueue.EnqueueAndDispatch(newChannel);
+				}
+			}
+			catch (Exception e)
+			{
+				Console.WriteLine("Error dispatching Message.");
+				Console.WriteLine(e.ToString());
+			}
+		}
+
+		/// <summary>
+		/// Creates the or retrieve channel.
+		/// </summary>
+		/// <param name="newChannel">The new channel.</param>
+		private bool CreateOrRetrieveChannel(out NmsInputChannel newChannel)
+		{
+			bool channelCreated = false;
+
+			if ((newChannel = _currentChannel) == null)
+			{
+				lock (_currentChannelLock)
+				{
+					if ((newChannel = _currentChannel) == null)
+					{
+						newChannel = CreateNmsChannel(Uri);
+						newChannel.Closed += OnChannelClosed;
+						_currentChannel = newChannel;
+						channelCreated = true;
+					}
+				}
+			}
+
+			return channelCreated;
+		}
+
+		/// <summary>
+		/// Called when the channel is closed.
+		/// </summary>
+		/// <param name="sender">The sender.</param>
+		/// <param name="args">The <see cref="System.EventArgs"/> instance containing the event data.</param>
+		private void OnChannelClosed(object sender, EventArgs args)
+		{
+			NmsInputChannel channel = (NmsInputChannel)sender;
+
+			lock (_currentChannelLock)
+			{
+				if (channel == _currentChannel)
+				{
+					_currentChannel = null;
+				}
+			}
+		}
+
+		/// <summary>
+		/// Creates the <see cref="NmsInputChannel" /> that will wait for inbound messages.
+		/// </summary>
+		/// <param name="uri">The URI for the message queue.</param>
+		private NmsInputChannel CreateNmsChannel(Uri uri)
+		{
+			_connection = OpenConnection(uri);
+			_session = OpenSession(_connection);
+			_destination = NmsChannelHelper.GetDestination(_session, Destination, DestinationType);
+			_consumer = CreateConsumer(_session, _destination);
+
+			EndpointAddress address = new EndpointAddress(uri);
+			return new NmsInputChannel(_bufferManager, _messageEncoderFactory, address, this);
+		}
+
+		/// <summary>
+		/// Opens the connection to the message broker.
+		/// </summary>
+		/// <param name="uri">The URI.</param>
+		/// <returns>An active connection to the ActiveMQ message broker specified by the URI;
+		/// exceptions will be caught by the attached ExceptionListener.</returns>
+		private IConnection OpenConnection(Uri uri)
+		{
+			IConnection connection = ConnectionFactoryManager.GetInstance().CreateConnection(uri);
+			connection.ExceptionListener += OnExceptionThrown;
+			connection.Start();
+			Console.WriteLine("Connection open");
+			return connection;
+		}
+
+		/// <summary>
+		/// Opens a session to communicate with a message queue.
+		/// </summary>
+		/// <param name="connection">The connection to the ActiveMQ message broker.</param>
+		/// <returns>A sessio </returns>
+		/// <exception cref="InvalidOperationException">the <paramref name="connection" /> has not yet
+		/// been started.</exception>
+		private NMS.ISession OpenSession(IConnection connection)
+		{
+			if (!connection.IsStarted)
+			{
+				throw new InvalidOperationException("The connection has not yet been opened");
+			}
+
+			Console.WriteLine("Opening session...");
+			NMS.ISession session = connection.CreateSession();
+			Console.WriteLine("Session open");
+			return session;
+		}
+
+		/// <summary>
+		/// Opens the destination (which can be either a queue or a topic, for publish-subscribe).
+		/// </summary>
+		/// <param name="session">The session that will be used for communicating with the destination.</param>
+		/// <param name="uri">The URI of the destination.</param>
+		private IDestination OpenDestination(NMS.ISession session, Uri uri)
+		{
+			string queueName = NmsChannelHelper.GetQueueName(uri);
+			Console.WriteLine("Connecting to queue '{0}'...", queueName);
+			IDestination destination = session.GetQueue(queueName);
+			Console.WriteLine("Connected to queue");
+
+			return destination;
+		}
+
+		/// <summary>
+		/// Creates the consumer of messages received on the <paramref name="session"/>.
+		/// </summary>
+		/// <param name="session">The session.</param>
+		/// <param name="destination">The destination.</param>
+		/// <returns>A consumer for any messages received during the session;
+		/// messages will be consumed by the attached Listener.</returns>
+		private IMessageConsumer CreateConsumer(NMS.ISession session, IDestination destination)
+		{
+			Console.WriteLine("Creating message listener...");
+			IMessageConsumer consumer = session.CreateConsumer(destination);
+			consumer.Listener += OnReceiveMessage;
+			Console.WriteLine("Created message listener");
+			return consumer;
+		}
+
+		/// <summary>
+		/// Event handler that processes a received message.
+		/// </summary>
+		/// <param name="message">The message.</param>
+		private void OnReceiveMessage(IMessage message)
+		{
+			Console.WriteLine("Received message");
+			string soapMsg = ((ITextMessage)message).Text;
+			byte[] buffer = Encoding.ASCII.GetBytes(soapMsg);
+			int dataLength = buffer.Length;
+			byte[] data1 = _bufferManager.TakeBuffer(dataLength);
+			Array.Copy(buffer, data1, dataLength);
+
+			ArraySegment<byte> data = new ArraySegment<byte>(data1, 0, dataLength);
+
+			byte[] msgContents = new byte[data.Count];
+			Array.Copy(data.Array, data.Offset, msgContents, 0, msgContents.Length);
+			Message msg = _messageEncoderFactory.Encoder.ReadMessage(data, _bufferManager);
+
+			Console.WriteLine(msg);
+			Dispatch(msg);
+		}
+
+		/// <summary>
+		/// Called when an exception is thrown by the ActiveMQ listener.
+		/// </summary>
+		/// <param name="exception">The exception that was thrown.</param>
+		private void OnExceptionThrown(Exception exception)
+		{
+			// TODO: investigate whether it is normal behaviour for a NRE to be thrown during shutdown
+			if (exception.GetType() == typeof(NullReferenceException))
+			{
+				return;
+			}
+
+			// TODO: can we recover from the exception? Do we convert to WCF exceptions?
+			Console.WriteLine("{0} thrown : {1}\n{2}", 
+				exception.GetType().Name, 
+				exception.Message, 
+				exception.StackTrace);
+		}
+
+		/// <summary>
+		/// Guarantees that a channel is attached to this listener.
+		/// </summary>
+		private void EnsureChannelAvailable()
+		{
+			NmsInputChannel newChannel;
+			if (CreateOrRetrieveChannel(out newChannel))
+			{
+				_channelQueue.EnqueueAndDispatch(newChannel);
+			}
+		}
+
+		#region Private members
+
+		private readonly Uri _uri;
+		private IConnection _connection;
+		private ISession _session;
+		private IDestination _destination;
+		private IMessageConsumer _consumer;
+		private readonly InputQueue<IInputChannel> _channelQueue;
+		private NmsInputChannel _currentChannel;
+		private readonly object _currentChannelLock;
+		private readonly MessageEncoderFactory _messageEncoderFactory;
+		private readonly BufferManager _bufferManager;
+		private string _destinationName;
+		private DestinationType _destinationType;
+
+		#endregion
+	}
+}
\ No newline at end of file



Mime
View raw message