activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r924338 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src: main/csharp/Connection.cs main/csharp/MessageConsumer.cs main/csharp/MessageProducer.cs main/csharp/QueueBrowser.cs main/csharp/Session.cs test/csharp/QueueBrowserTests.cs
Date Wed, 17 Mar 2010 15:49:09 GMT
Author: tabish
Date: Wed Mar 17 15:49:09 2010
New Revision: 924338

URL: http://svn.apache.org/viewvc?rev=924338&view=rev
Log:
http://issues.apache.org/activemq/browse/AMQNET-206

Add QueueBrowser implementation based on supplied patch.  Changes where needed to better integrate
into the codebase.  

Added:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/QueueBrowser.cs   (with
props)
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/QueueBrowserTests.cs
  (with props)
Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=924338&r1=924337&r2=924338&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs Wed Mar
17 15:49:09 2010
@@ -769,15 +769,15 @@ namespace Apache.NMS.ActiveMQ
 
             foreach(Session session in this.sessions)
             {
-				try
-				{
-                	session.ClearMessagesInProgress();
-				}
-				catch(Exception ex)
-				{
-					Tracer.Warn("Exception while clearing messages: " + ex.Message);
-					Tracer.Warn(ex.StackTrace);
-				}
+                try
+                {
+                    session.ClearMessagesInProgress();
+                }
+                catch(Exception ex)
+                {
+                    Tracer.Warn("Exception while clearing messages: " + ex.Message);
+                    Tracer.Warn(ex.StackTrace);
+                }
             }
 
             if(this.ConnectionInterruptedListener != null && !this.closing )
@@ -824,14 +824,6 @@ namespace Apache.NMS.ActiveMQ
         }
 
         /// <summary>
-        /// Creates a new temporary destination name
-        /// </summary>
-        public String CreateTemporaryDestinationName()
-        {
-            return info.ConnectionId.Value + ":" + Interlocked.Increment(ref temporaryDestinationCounter);
-        }
-
-        /// <summary>
         /// Creates a new local transaction ID
         /// </summary>
         public LocalTransactionId CreateLocalTransactionId()
@@ -852,6 +844,37 @@ namespace Apache.NMS.ActiveMQ
             return answer;
         }
 
+        public ActiveMQTempDestination CreateTemporaryDestination(bool topic)
+        {
+           ActiveMQTempDestination destination = null;
+
+           if(topic)
+           {
+               destination = new ActiveMQTempTopic(
+                   info.ConnectionId.Value + ":" + Interlocked.Increment(ref temporaryDestinationCounter));
+           }
+           else
+           {
+               destination = new ActiveMQTempQueue(
+                   info.ConnectionId.Value + ":" + Interlocked.Increment(ref temporaryDestinationCounter));
+           }
+
+            DestinationInfo command = new DestinationInfo();
+            command.ConnectionId = ConnectionId;
+            command.OperationType = DestinationInfo.ADD_OPERATION_TYPE; // 0 is add
+            command.Destination = destination;
+
+            this.SyncRequest(command);
+
+            destination.Connection = this;
+
+            return destination;
+        }
+
+        protected void CreateTemporaryDestination(ActiveMQDestination tempDestination)
+        {
+        }
+
         public void DeleteTemporaryDestination(IDestination destination)
         {
             this.DeleteDestination(destination);

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=924338&r1=924337&r2=924338&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
Wed Mar 17 15:49:09 2010
@@ -66,11 +66,54 @@ namespace Apache.NMS.ActiveMQ
 		private IRedeliveryPolicy redeliveryPolicy;
 
 		// Constructor internal to prevent clients from creating an instance.
-		internal MessageConsumer(Session session, ConsumerInfo info)
-		{
+		internal MessageConsumer(Session session, ConsumerId id, ActiveMQDestination destination,

+                                 String name, String selector, int prefetch, int maxPendingMessageCount,

+                                 bool noLocal, bool browser, bool dispatchAsync )
+		{
+            if(destination == null)
+            {
+                throw new InvalidDestinationException("Consumer cannot receive on Null Destinations.");
+            }
+            
 			this.session = session;
-			this.info = info;
-			this.redeliveryPolicy = this.session.Connection.RedeliveryPolicy;
+            this.redeliveryPolicy = this.session.Connection.RedeliveryPolicy;
+			
+            this.info = new ConsumerInfo();
+            this.info.ConsumerId = id;
+            this.info.Destination = destination;
+            this.info.SubscriptionName = name;
+            this.info.Selector = selector;
+            this.info.PrefetchSize = prefetch;
+            this.info.MaximumPendingMessageLimit = maxPendingMessageCount;
+            this.info.NoLocal = noLocal;
+            this.info.Browser = browser;
+            this.info.DispatchAsync = dispatchAsync;
+            this.info.Retroactive = session.Retroactive;
+            this.info.Exclusive = session.Exclusive;
+            this.info.Priority = session.Priority;
+            
+            // If the destination contained a URI query, then use it to set public properties
+            // on the ConsumerInfo
+            if(destination.Options != null)
+            {
+                URISupport.SetProperties(this.info, destination.Options, "consumer.");
+            }
+            
+//            try
+//            {
+//                this.session.AddConsumer(this);
+//                this.session.Connection.SyncRequest(this.info);
+//
+//                if(this.session.Connection.IsStarted)
+//                {
+//                    this.Start();
+//                }
+//            }
+//            catch(Exception)
+//            {
+//                this.session.RemoveConsumer(this.info.ConsumerId);
+//                throw;
+//            }
 		}
 
 		~MessageConsumer()
@@ -87,9 +130,14 @@ namespace Apache.NMS.ActiveMQ
 
 		public ConsumerId ConsumerId
 		{
-			get { return info.ConsumerId; }
+			get { return this.info.ConsumerId; }
 		}
 
+        public ConsumerInfo ConsumerInfo
+        {
+            get { return this.info; }
+        }
+
 		public int RedeliveryTimeout
 		{
 			get { return redeliveryTimeout; }
@@ -106,6 +154,11 @@ namespace Apache.NMS.ActiveMQ
 			get { return this.redeliveryPolicy; }
 			set { this.redeliveryPolicy = value; }
 		}
+        
+        public long UnconsumedMessageCount
+        {
+            get { return this.unconsumedMessages.Count; }
+        }   
 
 		#endregion
 
@@ -286,7 +339,7 @@ namespace Apache.NMS.ActiveMQ
 				}
 
 				this.unconsumedMessages.Close();
-				this.session.DisposeOf(this.info.ConsumerId, this.lastDeliveredSequenceId);
+				this.session.RemoveConsumer(this.info.ConsumerId);
 
                 RemoveInfo removeCommand = new RemoveInfo();
 				removeCommand.ObjectId = this.info.ConsumerId;
@@ -444,7 +497,7 @@ namespace Apache.NMS.ActiveMQ
 			}
 		}
 
-		public void Dispatch(MessageDispatch dispatch)
+		public virtual void Dispatch(MessageDispatch dispatch)
 		{
 			MessageListener listener = this.listener;
 

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs?rev=924338&r1=924337&r2=924338&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
Wed Mar 17 15:49:09 2010
@@ -43,17 +43,28 @@ namespace Apache.NMS.ActiveMQ
         private bool disableMessageTimestamp = false;
         protected bool disposed = false;
 
-        public MessageProducer(Session session, ProducerInfo info)
+        public MessageProducer(Session session, ProducerId id, ActiveMQDestination destination,
TimeSpan requestTimeout)
         {
-            this.session = session;
-            this.info = info;
-            this.RequestTimeout = session.RequestTimeout;
+            this.session = session;            
+            this.RequestTimeout = requestTimeout;
 
+            this.info = new ProducerInfo();
+            this.info.ProducerId = id;
+            this.info.Destination = destination;
+            this.info.WindowSize = session.Connection.ProducerWindowSize;
+            
+            // If the destination contained a URI query, then use it to set public
+            // properties on the ProducerInfo
+            if(destination != null && destination.Options != null)
+            {
+                URISupport.SetProperties(this.info, destination.Options, "producer.");
+            }
+            
             // Version Three and higher will send us a ProducerAck, but only if we
             // have a set producer window size.
-            if( session.Connection.ProtocolVersion >= 3 && info.WindowSize >
0 )
+            if(session.Connection.ProtocolVersion >= 3 && this.info.WindowSize
> 0)
             {
-                usage = new MemoryUsage( info.WindowSize );
+                this.usage = new MemoryUsage(this.info.WindowSize);
             }
         }
 
@@ -120,7 +131,7 @@ namespace Apache.NMS.ActiveMQ
 
                 try
                 {
-                    session.DisposeOf(info.ProducerId);
+                    session.RemoveProducer(info.ProducerId);
                 }
                 catch(Exception ex)
                 {
@@ -231,6 +242,11 @@ namespace Apache.NMS.ActiveMQ
             get { return info.ProducerId; }
         }
 
+        public ProducerInfo ProducerInfo
+        {
+            get { return info; }
+        }
+
         public MsgDeliveryMode DeliveryMode
         {
             get { return msgDeliveryMode; }

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/QueueBrowser.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/QueueBrowser.cs?rev=924338&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/QueueBrowser.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/QueueBrowser.cs Wed
Mar 17 15:49:09 2010
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections;
+using System.Collections.Generic;
+using System.Text;
+using Apache.NMS.ActiveMQ.Commands;
+using System.Threading;
+using Apache.NMS.Util;
+using System.Runtime.CompilerServices;
+using System.Diagnostics;
+
+namespace Apache.NMS.ActiveMQ
+{
+	public class QueueBrowser : IQueueBrowser, IEnumerator
+	{
+		private readonly Session session;
+		private readonly ActiveMQDestination destination;
+		private readonly string selector;
+
+		private MessageConsumer consumer;
+		private bool closed;
+		private readonly ConsumerId consumerId;
+		private readonly Atomic<bool> browseDone = new Atomic<bool>(true);
+		private readonly bool dispatchAsync;
+		private object semaphore = new object();
+		private object myLock = new object();
+
+		internal QueueBrowser(Session session, ConsumerId consumerId, ActiveMQDestination destination,
string selector, bool dispatchAsync)
+		{
+			this.session = session;
+			this.consumerId = consumerId;
+			this.destination = destination;
+			this.selector = selector;
+			this.dispatchAsync = dispatchAsync;
+			this.consumer = CreateConsumer();
+		}
+
+		private MessageConsumer CreateConsumer()
+		{
+            this.browseDone.Value = false;
+			BrowsingMessageConsumer consumer = null;
+
+			try
+			{
+                consumer = new BrowsingMessageConsumer(
+                    this, session, this.consumerId, this.destination, null, this.selector,

+                    this.session.Connection.PrefetchPolicy.QueueBrowserPrefetch,
+                    this.session.Connection.PrefetchPolicy.MaximumPendingMessageLimit,
+                    false, true, this.dispatchAsync);
+
+                this.session.AddConsumer(consumer);
+                this.session.Connection.SyncRequest(consumer.ConsumerInfo);
+
+                if(this.session.Connection.IsStarted)
+                {
+                    consumer.Start();
+                }
+            }
+            catch(Exception)
+            {
+                if(consumer != null)
+                {
+                    this.session.RemoveConsumer(consumer.ConsumerId);
+                    consumer.Close();
+                }
+
+                throw;
+            }
+
+            return consumer;
+		}
+
+		private void DestroyConsumer()
+		{
+			if(consumer == null)
+            {
+				return;
+            }
+
+			try
+			{
+				consumer.Close();
+				consumer = null;
+			}
+			catch(NMSException e)
+			{
+                Tracer.Debug(e.StackTrace.ToString());
+			}
+		}
+
+		public IEnumerator GetEnumerator()
+		{
+			CheckClosed();
+
+			if(this.consumer == null)
+            {
+				this.consumer = CreateConsumer();
+            }
+
+			return this;
+		}
+
+
+		private void CheckClosed()
+		{
+			if(this.closed)
+            {
+				throw new IllegalStateException("The Consumer is closed");
+            }
+		}
+
+		public bool MoveNext()
+		{
+			while(true)
+			{
+				lock(myLock)
+				{
+					if(consumer == null)
+                    {
+						return false;
+                    }
+				}
+
+				if(consumer.UnconsumedMessageCount > 0)
+                {
+					return true;
+                }
+
+				if(browseDone.Value || !session.Started)
+				{
+					DestroyConsumer();
+					return false;
+				}
+
+				WaitForMessage();
+			}
+		}
+
+		public object Current
+		{
+			get
+			{
+				while(true)
+				{
+					lock(myLock)
+					{
+						if(consumer == null)
+                        {
+							return null;
+                        }
+					}
+
+					try
+					{
+						IMessage answer = consumer.ReceiveNoWait();
+
+						if(answer != null)
+                        {
+							return answer;
+                        }
+					}
+					catch(NMSException)
+					{
+						//TODO: Not implemented.
+						//this.session.Connection.OnClientInternalException(e);
+						return null;
+					}
+
+					if(browseDone.Value || !session.Started)
+					{
+						DestroyConsumer();
+						return null;
+					}
+
+					WaitForMessage();
+				}
+			}
+		}
+
+		[MethodImpl(MethodImplOptions.Synchronized)]
+		public void Close()
+		{
+			DestroyConsumer();
+			closed = true;
+		}
+
+		public IQueue Queue
+		{
+			get { return (IQueue)destination; }
+		}
+
+		public string MessageSelector
+		{
+			get { return selector; }
+		}
+
+		protected void WaitForMessage()
+		{
+			try
+			{
+				lock(semaphore)
+				{
+					Monitor.Wait(semaphore, 2000);
+				}
+			}
+			catch(ThreadInterruptedException)
+			{
+				Thread.CurrentThread.Interrupt();
+			}
+		}
+
+		protected void NotifyMessageAvailable()
+		{
+			lock(semaphore)
+			{
+				Monitor.PulseAll(semaphore);
+			}
+		}
+
+		public override string ToString()
+		{
+			return "QueueBrowser { value=" + consumerId + " }";
+		}
+
+		public void Reset()
+		{
+			if(consumer != null)
+            {
+				DestroyConsumer();
+            }
+
+			consumer = CreateConsumer();
+		}
+
+		public class BrowsingMessageConsumer : MessageConsumer
+		{
+			private QueueBrowser parent;
+
+			public BrowsingMessageConsumer(QueueBrowser parent, Session session, ConsumerId id, ActiveMQDestination
destination, 
+                                           String name, String selector, int prefetch, int
maxPendingMessageCount, 
+                                           bool noLocal, bool browser, bool dispatchAsync)
+				: base(session, id, destination, name, selector, prefetch, maxPendingMessageCount, noLocal,
browser, dispatchAsync)
+			{
+				this.parent = parent;
+			}
+
+			public override void Dispatch(MessageDispatch md)
+			{
+				if(md.Message == null)
+                {
+					parent.browseDone.Value = true;
+                }
+				else
+				{
+					base.Dispatch(md);
+				}
+
+				parent.NotifyMessageAvailable();
+			}
+		}
+	}
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/QueueBrowser.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs?rev=924338&r1=924337&r2=924338&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs Wed Mar
17 15:49:09 2010
@@ -356,29 +356,32 @@ namespace Apache.NMS.ActiveMQ
 
         public IMessageProducer CreateProducer(IDestination destination)
         {
-            ProducerInfo command = CreateProducerInfo(destination);
-            ProducerId producerId = command.ProducerId;
             MessageProducer producer = null;
 
             try
             {
-                producer = new MessageProducer(this, command);
-                producers[producerId] = producer;
-                this.connection.Oneway(command);
+                ActiveMQDestination dest = null;
+                if(destination != null)
+                {
+                    dest = ActiveMQDestination.Transform(destination);
+                }
+
+                producer = new MessageProducer(this, GetNextProducerId(), dest, this.RequestTimeout);
+
+                this.AddProducer(producer);
+                this.Connection.Oneway(producer.ProducerInfo);
             }
             catch(Exception)
             {
                 if(producer != null)
                 {
+                    this.RemoveProducer(producer.ProducerId);
                     producer.Close();
                 }
 
                 throw;
             }
 
-            // Registered with Connection so it can process Producer Acks.
-            connection.addProducer(producerId, producer);
-
             return producer;
         }
 
@@ -394,78 +397,83 @@ namespace Apache.NMS.ActiveMQ
 
         public IMessageConsumer CreateConsumer(IDestination destination, string selector,
bool noLocal)
         {
-            if (destination == null)
+            if(destination == null)
             {
                 throw new InvalidDestinationException("Cannot create a Consumer with a Null
destination");
             }
 
-            ConsumerInfo command = CreateConsumerInfo(destination, selector);
-            command.NoLocal = noLocal;
-            ConsumerId consumerId = command.ConsumerId;
-            MessageConsumer consumer = null;
+            ActiveMQDestination dest = ActiveMQDestination.Transform(destination);
+            int prefetchSize = this.Connection.PrefetchPolicy.DurableTopicPrefetch;
 
-            // Registered with Connection before we register at the broker.
-            connection.addDispatcher(consumerId, this);
+            if(dest is ITopic || dest is ITemporaryTopic)
+            {
+                prefetchSize = this.connection.PrefetchPolicy.TopicPrefetch;
+            }
+            else if(dest is IQueue || dest is ITemporaryQueue)
+            {
+                prefetchSize = this.connection.PrefetchPolicy.QueuePrefetch;
+            }
+
+            MessageConsumer consumer = null;
 
             try
             {
-                consumer = new MessageConsumer(this, command);
-                // lets register the consumer first in case we start dispatching messages
immediately
-                consumers[consumerId] = consumer;
-                this.Connection.SyncRequest(command);
+                consumer = new MessageConsumer(this, GetNextConsumerId(), dest, null, selector,
prefetchSize,
+                                               this.connection.PrefetchPolicy.MaximumPendingMessageLimit,
+                                               noLocal, false, this.connection.DispatchAsync);
 
-                if(this.Started)
+                this.AddConsumer(consumer);
+                this.Connection.SyncRequest(consumer.ConsumerInfo);
+
+                if(this.Connection.IsStarted)
                 {
                     consumer.Start();
                 }
-
-                return consumer;
             }
             catch(Exception)
             {
                 if(consumer != null)
                 {
+                    this.RemoveConsumer(consumer.ConsumerId);
                     consumer.Close();
                 }
 
                 throw;
             }
+
+            return consumer;
         }
 
         public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string
selector, bool noLocal)
         {
-            if (destination == null)
+            if(destination == null)
             {
                 throw new InvalidDestinationException("Cannot create a Consumer with a Null
destination");
             }
 
-            ConsumerInfo command = CreateConsumerInfo(destination, selector);
-            ConsumerId consumerId = command.ConsumerId;
-            command.SubscriptionName = name;
-            command.NoLocal = noLocal;
-            command.PrefetchSize = this.connection.PrefetchPolicy.DurableTopicPrefetch;
+            ActiveMQDestination dest = ActiveMQDestination.Transform(destination);
             MessageConsumer consumer = null;
 
-            // Registered with Connection before we register at the broker.
-            connection.addDispatcher(consumerId, this);
-
             try
             {
-                consumer = new MessageConsumer(this, command);
-                // lets register the consumer first in case we start dispatching messages
immediately
-                consumers[consumerId] = consumer;
+                consumer = new MessageConsumer(this, GetNextConsumerId(), dest, name, selector,
+                                               this.connection.PrefetchPolicy.DurableTopicPrefetch,
+                                               this.connection.PrefetchPolicy.MaximumPendingMessageLimit,
+                                               noLocal, false, this.connection.DispatchAsync);
 
-                if(this.Started)
+                this.AddConsumer(consumer);
+                this.Connection.SyncRequest(consumer.ConsumerInfo);
+
+                if(this.Connection.IsStarted)
                 {
                     consumer.Start();
                 }
-
-                this.connection.SyncRequest(command);
             }
             catch(Exception)
             {
                 if(consumer != null)
                 {
+                    this.RemoveConsumer(consumer.ConsumerId);
                     consumer.Close();
                 }
 
@@ -486,12 +494,34 @@ namespace Apache.NMS.ActiveMQ
 
         public IQueueBrowser CreateBrowser(IQueue queue)
         {
-            throw new NotSupportedException("Not Yet Implemented");
+            return this.CreateBrowser(queue, null);
         }
 
         public IQueueBrowser CreateBrowser(IQueue queue, string selector)
         {
-            throw new NotSupportedException("Not Yet Implemented");
+            if(queue == null)
+            {
+                throw new InvalidDestinationException("Cannot create a Consumer with a Null
destination");
+            }
+
+            ActiveMQDestination dest = ActiveMQDestination.Transform(queue);
+            QueueBrowser browser = null;
+
+            try
+            {
+                browser = new QueueBrowser(this, GetNextConsumerId(), dest, selector, this.DispatchAsync);
+            }
+            catch(Exception)
+            {
+                if(browser != null)
+                {
+                    browser.Close();
+                }
+
+                throw;
+            }
+
+            return browser;
         }
 
         public IQueue GetQueue(string name)
@@ -506,16 +536,12 @@ namespace Apache.NMS.ActiveMQ
 
         public ITemporaryQueue CreateTemporaryQueue()
         {
-            ActiveMQTempQueue answer = new ActiveMQTempQueue(Connection.CreateTemporaryDestinationName());
-            CreateTemporaryDestination(answer);
-            return answer;
+            return (ITemporaryQueue)this.connection.CreateTemporaryDestination(false);
         }
 
         public ITemporaryTopic CreateTemporaryTopic()
         {
-            ActiveMQTempTopic answer = new ActiveMQTempTopic(Connection.CreateTemporaryDestinationName());
-            CreateTemporaryDestination(answer);
-            return answer;
+            return (ITemporaryTopic)this.connection.CreateTemporaryDestination(true);
         }
 
         /// <summary>
@@ -523,12 +549,7 @@ namespace Apache.NMS.ActiveMQ
         /// </summary>
         public void DeleteDestination(IDestination destination)
         {
-            DestinationInfo command = new DestinationInfo();
-            command.ConnectionId = Connection.ConnectionId;
-            command.OperationType = DestinationInfo.REMOVE_OPERATION_TYPE; // 1 is remove
-            command.Destination = (ActiveMQDestination) destination;
-
-            this.connection.Oneway(command);
+            this.connection.DeleteDestination(destination);
         }
 
         public IMessage CreateMessage()
@@ -604,16 +625,6 @@ namespace Apache.NMS.ActiveMQ
 
         #endregion
 
-        protected void CreateTemporaryDestination(ActiveMQDestination tempDestination)
-        {
-            DestinationInfo command = new DestinationInfo();
-            command.ConnectionId = Connection.ConnectionId;
-            command.OperationType = DestinationInfo.ADD_OPERATION_TYPE; // 0 is add
-            command.Destination = tempDestination;
-
-            this.connection.SyncRequest(command);
-        }
-
         public void DoSend( ActiveMQMessage message, MessageProducer producer, MemoryUsage
producerWindow, TimeSpan sendTimeout )
         {
             ActiveMQMessage msg = message;
@@ -673,18 +684,33 @@ namespace Apache.NMS.ActiveMQ
             }
         }
 
-        public void DisposeOf(ConsumerId objectId, long lastDeliveredSequenceId)
+        public void AddConsumer(MessageConsumer consumer)
         {
-            connection.removeDispatcher(objectId);
-            this.lastDeliveredSequenceId = Math.Min(this.lastDeliveredSequenceId, lastDeliveredSequenceId);
+            ConsumerId id = consumer.ConsumerId;
 
+            // Registered with Connection before we register at the broker.
+            consumers[id] = consumer;
+            connection.addDispatcher(id, this);
+        }
+
+        public void RemoveConsumer(ConsumerId objectId)
+        {
+            connection.removeDispatcher(objectId);
             if(!this.closing)
             {
                 consumers.Remove(objectId);
             }
         }
 
-        public void DisposeOf(ProducerId objectId)
+        public void AddProducer(MessageProducer producer)
+        {
+            ProducerId id = producer.ProducerId;
+
+            this.producers[id] = producer;
+            this.connection.addProducer(id, producer);
+        }
+
+        public void RemoveProducer(ProducerId objectId)
         {
             connection.removeProducer(objectId);
             if(!this.closing)
@@ -693,62 +719,24 @@ namespace Apache.NMS.ActiveMQ
             }
         }
 
-        protected virtual ConsumerInfo CreateConsumerInfo(IDestination destination, string
selector)
+        public ConsumerId GetNextConsumerId()
         {
-            ConsumerInfo answer = new ConsumerInfo();
             ConsumerId id = new ConsumerId();
             id.ConnectionId = info.SessionId.ConnectionId;
             id.SessionId = info.SessionId.Value;
             id.Value = Interlocked.Increment(ref consumerCounter);
-            answer.ConsumerId = id;
-            answer.Destination = ActiveMQDestination.Transform(destination);
-            answer.Selector = selector;
-            answer.Priority = this.Priority;
-            answer.Exclusive = this.Exclusive;
-            answer.DispatchAsync = this.DispatchAsync;
-            answer.Retroactive = this.Retroactive;
-            answer.MaximumPendingMessageLimit = this.connection.PrefetchPolicy.MaximumPendingMessageLimit;
-
-            if(destination is ITopic || destination is ITemporaryTopic)
-            {
-                answer.PrefetchSize = this.connection.PrefetchPolicy.TopicPrefetch;
-            }
-            else if(destination is IQueue || destination is ITemporaryQueue)
-            {
-                answer.PrefetchSize = this.connection.PrefetchPolicy.QueuePrefetch;
-            }
-
-            // If the destination contained a URI query, then use it to set public properties
-            // on the ConsumerInfo
-            ActiveMQDestination amqDestination = destination as ActiveMQDestination;
-            if(amqDestination != null && amqDestination.Options != null)
-            {
-                URISupport.SetProperties(answer, amqDestination.Options, "consumer.");
-            }
 
-            return answer;
+            return id;
         }
 
-        protected virtual ProducerInfo CreateProducerInfo(IDestination destination)
+        public ProducerId GetNextProducerId()
         {
-            ProducerInfo answer = new ProducerInfo();
             ProducerId id = new ProducerId();
             id.ConnectionId = info.SessionId.ConnectionId;
             id.SessionId = info.SessionId.Value;
             id.Value = Interlocked.Increment(ref producerCounter);
-            answer.ProducerId = id;
-            answer.Destination = ActiveMQDestination.Transform(destination);
-            answer.WindowSize = connection.ProducerWindowSize;
-
-            // If the destination contained a URI query, then use it to set public
-            // properties on the ProducerInfo
-            ActiveMQDestination amqDestination = destination as ActiveMQDestination;
-            if(amqDestination != null && amqDestination.Options != null)
-            {
-                URISupport.SetProperties(answer, amqDestination.Options, "producer.");
-            }
 
-            return answer;
+            return id;
         }
 
         public void Stop()

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/QueueBrowserTests.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/QueueBrowserTests.cs?rev=924338&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/QueueBrowserTests.cs
(added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/QueueBrowserTests.cs
Wed Mar 17 15:49:09 2010
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections;
+using System.Diagnostics;
+using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.Test;
+using NUnit.Framework;
+
+namespace Apache.NMS.ActiveMQ.Test
+{
+	[TestFixture]
+	public class QueueBrowserTests : NMSTestSupport
+	{
+		[Test]
+		public void TestReceiveBrowseReceive()
+		{
+			using (IConnection connection = CreateConnection())
+			{
+				using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+				{
+                    IDestination destination = session.GetQueue("TestReceiveBrowseReceive");
+					IMessageProducer producer = session.CreateProducer(destination);
+					IMessageConsumer consumer = session.CreateConsumer(destination);
+					connection.Start();
+
+					IMessage[] outbound = new IMessage[]{session.CreateTextMessage("First Message"),
+                                                         session.CreateTextMessage("Second
Message"),
+                                                         session.CreateTextMessage("Third
Message")};
+
+					// lets consume any outstanding messages from previous test runs
+					while (consumer.Receive(TimeSpan.FromMilliseconds(1000)) != null)
+					{
+					}
+
+					producer.Send(outbound[0]);
+					producer.Send(outbound[1]);
+					producer.Send(outbound[2]);
+
+					IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+
+					// Get the first.
+					Assert.AreEqual(((ITextMessage)outbound[0]).Text, ((ITextMessage)msg).Text);
+					consumer.Close();
+
+					IQueueBrowser browser = session.CreateBrowser((IQueue)destination);
+					IEnumerator enumeration = browser.GetEnumerator();
+
+					// browse the second
+					Assert.IsTrue(enumeration.MoveNext(), "should have received the second message");
+					Assert.AreEqual(((ITextMessage)outbound[1]).Text, ((ITextMessage)enumeration.Current).Text);
+
+					// browse the third.
+					Assert.IsTrue(enumeration.MoveNext(), "Should have received the third message");
+					Assert.AreEqual(((ITextMessage)outbound[2]).Text, ((ITextMessage)enumeration.Current).Text);
+
+					// There should be no more.
+					bool tooMany = false;
+					while (enumeration.MoveNext())
+					{
+						Debug.WriteLine("Got extra message: " + ((ITextMessage)enumeration.Current).Text);
+						tooMany = true;
+					}
+					Assert.IsFalse(tooMany);
+
+					//Reset should take us back to the start.
+					enumeration.Reset();
+
+					// browse the second
+					Assert.IsTrue(enumeration.MoveNext(), "should have received the second message");
+					Assert.AreEqual(((ITextMessage)outbound[1]).Text, ((ITextMessage)enumeration.Current).Text);
+
+					// browse the third.
+					Assert.IsTrue(enumeration.MoveNext(), "Should have received the third message");
+					Assert.AreEqual(((ITextMessage)outbound[2]).Text, ((ITextMessage)enumeration.Current).Text);
+
+					// There should be no more.
+					tooMany = false;
+					while (enumeration.MoveNext())
+					{
+						Debug.WriteLine("Got extra message: " + ((ITextMessage)enumeration.Current).Text);
+						tooMany = true;
+					}
+					Assert.IsFalse(tooMany);
+
+					browser.Close();
+
+					// Re-open the consumer.
+					consumer = session.CreateConsumer(destination);
+
+					// Receive the second.
+					Assert.AreEqual(((ITextMessage)outbound[1]).Text, ((ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000))).Text);
+					// Receive the third.
+					Assert.AreEqual(((ITextMessage)outbound[2]).Text, ((ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000))).Text);
+					consumer.Close();
+				}
+			}
+		}
+
+		[Test]
+		public void TestBrowseReceive()
+		{
+			using (IConnection connection = CreateConnection())
+			{
+				using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+				{
+                    IDestination destination = session.GetQueue("TestBrowseReceive");
+
+					connection.Start();
+
+                    using(IMessageConsumer purger = session.CreateConsumer(destination))
+                    {
+                        // lets consume any outstanding messages from previous test runs
+                        while(purger.Receive(TimeSpan.FromMilliseconds(1000)) != null)
+                        {
+                        }
+                        
+                        purger.Close();
+                    }
+                    
+					IMessage[] outbound = new IMessage[]{session.CreateTextMessage("First Message"),
+	                                                     session.CreateTextMessage("Second Message"),
+	                                                     session.CreateTextMessage("Third Message")};
+
+					IMessageProducer producer = session.CreateProducer(destination);
+					producer.Send(outbound[0]);
+
+					// create browser first
+					IQueueBrowser browser = session.CreateBrowser((IQueue)destination);
+					IEnumerator enumeration = browser.GetEnumerator();
+
+					// create consumer
+					IMessageConsumer consumer = session.CreateConsumer(destination);
+
+					// browse the first message
+					Assert.IsTrue(enumeration.MoveNext(), "should have received the first message");
+					Assert.AreEqual(((ITextMessage)outbound[0]).Text, ((ITextMessage)enumeration.Current).Text);
+
+					// Receive the first message.
+					Assert.AreEqual(((ITextMessage)outbound[0]).Text, ((ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000))).Text);
+					consumer.Close();
+					browser.Close();
+					producer.Close();
+				}
+			}
+		}
+	}
+}

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/QueueBrowserTests.cs
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message