Author: tabish Date: Wed Mar 17 15:49:09 2010 New Revision: 924338 URL: http://svn.apache.org/viewvc?rev=924338&view=rev Log: http://issues.apache.org/activemq/browse/AMQNET-206 Add QueueBrowser implementation based on supplied patch. Changes where needed to better integrate into the codebase. Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/QueueBrowser.cs (with props) activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/QueueBrowserTests.cs (with props) Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=924338&r1=924337&r2=924338&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs Wed Mar 17 15:49:09 2010 @@ -769,15 +769,15 @@ namespace Apache.NMS.ActiveMQ foreach(Session session in this.sessions) { - try - { - session.ClearMessagesInProgress(); - } - catch(Exception ex) - { - Tracer.Warn("Exception while clearing messages: " + ex.Message); - Tracer.Warn(ex.StackTrace); - } + try + { + session.ClearMessagesInProgress(); + } + catch(Exception ex) + { + Tracer.Warn("Exception while clearing messages: " + ex.Message); + Tracer.Warn(ex.StackTrace); + } } if(this.ConnectionInterruptedListener != null && !this.closing ) @@ -824,14 +824,6 @@ namespace Apache.NMS.ActiveMQ } /// - /// Creates a new temporary destination name - /// - public String CreateTemporaryDestinationName() - { - return info.ConnectionId.Value + ":" + Interlocked.Increment(ref temporaryDestinationCounter); - } - - /// /// Creates a new local transaction ID /// public LocalTransactionId CreateLocalTransactionId() @@ -852,6 +844,37 @@ namespace Apache.NMS.ActiveMQ return answer; } + public ActiveMQTempDestination CreateTemporaryDestination(bool topic) + { + ActiveMQTempDestination destination = null; + + if(topic) + { + destination = new ActiveMQTempTopic( + info.ConnectionId.Value + ":" + Interlocked.Increment(ref temporaryDestinationCounter)); + } + else + { + destination = new ActiveMQTempQueue( + info.ConnectionId.Value + ":" + Interlocked.Increment(ref temporaryDestinationCounter)); + } + + DestinationInfo command = new DestinationInfo(); + command.ConnectionId = ConnectionId; + command.OperationType = DestinationInfo.ADD_OPERATION_TYPE; // 0 is add + command.Destination = destination; + + this.SyncRequest(command); + + destination.Connection = this; + + return destination; + } + + protected void CreateTemporaryDestination(ActiveMQDestination tempDestination) + { + } + public void DeleteTemporaryDestination(IDestination destination) { this.DeleteDestination(destination); Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=924338&r1=924337&r2=924338&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs Wed Mar 17 15:49:09 2010 @@ -66,11 +66,54 @@ namespace Apache.NMS.ActiveMQ private IRedeliveryPolicy redeliveryPolicy; // Constructor internal to prevent clients from creating an instance. - internal MessageConsumer(Session session, ConsumerInfo info) - { + internal MessageConsumer(Session session, ConsumerId id, ActiveMQDestination destination, + String name, String selector, int prefetch, int maxPendingMessageCount, + bool noLocal, bool browser, bool dispatchAsync ) + { + if(destination == null) + { + throw new InvalidDestinationException("Consumer cannot receive on Null Destinations."); + } + this.session = session; - this.info = info; - this.redeliveryPolicy = this.session.Connection.RedeliveryPolicy; + this.redeliveryPolicy = this.session.Connection.RedeliveryPolicy; + + this.info = new ConsumerInfo(); + this.info.ConsumerId = id; + this.info.Destination = destination; + this.info.SubscriptionName = name; + this.info.Selector = selector; + this.info.PrefetchSize = prefetch; + this.info.MaximumPendingMessageLimit = maxPendingMessageCount; + this.info.NoLocal = noLocal; + this.info.Browser = browser; + this.info.DispatchAsync = dispatchAsync; + this.info.Retroactive = session.Retroactive; + this.info.Exclusive = session.Exclusive; + this.info.Priority = session.Priority; + + // If the destination contained a URI query, then use it to set public properties + // on the ConsumerInfo + if(destination.Options != null) + { + URISupport.SetProperties(this.info, destination.Options, "consumer."); + } + +// try +// { +// this.session.AddConsumer(this); +// this.session.Connection.SyncRequest(this.info); +// +// if(this.session.Connection.IsStarted) +// { +// this.Start(); +// } +// } +// catch(Exception) +// { +// this.session.RemoveConsumer(this.info.ConsumerId); +// throw; +// } } ~MessageConsumer() @@ -87,9 +130,14 @@ namespace Apache.NMS.ActiveMQ public ConsumerId ConsumerId { - get { return info.ConsumerId; } + get { return this.info.ConsumerId; } } + public ConsumerInfo ConsumerInfo + { + get { return this.info; } + } + public int RedeliveryTimeout { get { return redeliveryTimeout; } @@ -106,6 +154,11 @@ namespace Apache.NMS.ActiveMQ get { return this.redeliveryPolicy; } set { this.redeliveryPolicy = value; } } + + public long UnconsumedMessageCount + { + get { return this.unconsumedMessages.Count; } + } #endregion @@ -286,7 +339,7 @@ namespace Apache.NMS.ActiveMQ } this.unconsumedMessages.Close(); - this.session.DisposeOf(this.info.ConsumerId, this.lastDeliveredSequenceId); + this.session.RemoveConsumer(this.info.ConsumerId); RemoveInfo removeCommand = new RemoveInfo(); removeCommand.ObjectId = this.info.ConsumerId; @@ -444,7 +497,7 @@ namespace Apache.NMS.ActiveMQ } } - public void Dispatch(MessageDispatch dispatch) + public virtual void Dispatch(MessageDispatch dispatch) { MessageListener listener = this.listener; Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs?rev=924338&r1=924337&r2=924338&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs Wed Mar 17 15:49:09 2010 @@ -43,17 +43,28 @@ namespace Apache.NMS.ActiveMQ private bool disableMessageTimestamp = false; protected bool disposed = false; - public MessageProducer(Session session, ProducerInfo info) + public MessageProducer(Session session, ProducerId id, ActiveMQDestination destination, TimeSpan requestTimeout) { - this.session = session; - this.info = info; - this.RequestTimeout = session.RequestTimeout; + this.session = session; + this.RequestTimeout = requestTimeout; + this.info = new ProducerInfo(); + this.info.ProducerId = id; + this.info.Destination = destination; + this.info.WindowSize = session.Connection.ProducerWindowSize; + + // If the destination contained a URI query, then use it to set public + // properties on the ProducerInfo + if(destination != null && destination.Options != null) + { + URISupport.SetProperties(this.info, destination.Options, "producer."); + } + // Version Three and higher will send us a ProducerAck, but only if we // have a set producer window size. - if( session.Connection.ProtocolVersion >= 3 && info.WindowSize > 0 ) + if(session.Connection.ProtocolVersion >= 3 && this.info.WindowSize > 0) { - usage = new MemoryUsage( info.WindowSize ); + this.usage = new MemoryUsage(this.info.WindowSize); } } @@ -120,7 +131,7 @@ namespace Apache.NMS.ActiveMQ try { - session.DisposeOf(info.ProducerId); + session.RemoveProducer(info.ProducerId); } catch(Exception ex) { @@ -231,6 +242,11 @@ namespace Apache.NMS.ActiveMQ get { return info.ProducerId; } } + public ProducerInfo ProducerInfo + { + get { return info; } + } + public MsgDeliveryMode DeliveryMode { get { return msgDeliveryMode; } Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/QueueBrowser.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/QueueBrowser.cs?rev=924338&view=auto ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/QueueBrowser.cs (added) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/QueueBrowser.cs Wed Mar 17 15:49:09 2010 @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections; +using System.Collections.Generic; +using System.Text; +using Apache.NMS.ActiveMQ.Commands; +using System.Threading; +using Apache.NMS.Util; +using System.Runtime.CompilerServices; +using System.Diagnostics; + +namespace Apache.NMS.ActiveMQ +{ + public class QueueBrowser : IQueueBrowser, IEnumerator + { + private readonly Session session; + private readonly ActiveMQDestination destination; + private readonly string selector; + + private MessageConsumer consumer; + private bool closed; + private readonly ConsumerId consumerId; + private readonly Atomic browseDone = new Atomic(true); + private readonly bool dispatchAsync; + private object semaphore = new object(); + private object myLock = new object(); + + internal QueueBrowser(Session session, ConsumerId consumerId, ActiveMQDestination destination, string selector, bool dispatchAsync) + { + this.session = session; + this.consumerId = consumerId; + this.destination = destination; + this.selector = selector; + this.dispatchAsync = dispatchAsync; + this.consumer = CreateConsumer(); + } + + private MessageConsumer CreateConsumer() + { + this.browseDone.Value = false; + BrowsingMessageConsumer consumer = null; + + try + { + consumer = new BrowsingMessageConsumer( + this, session, this.consumerId, this.destination, null, this.selector, + this.session.Connection.PrefetchPolicy.QueueBrowserPrefetch, + this.session.Connection.PrefetchPolicy.MaximumPendingMessageLimit, + false, true, this.dispatchAsync); + + this.session.AddConsumer(consumer); + this.session.Connection.SyncRequest(consumer.ConsumerInfo); + + if(this.session.Connection.IsStarted) + { + consumer.Start(); + } + } + catch(Exception) + { + if(consumer != null) + { + this.session.RemoveConsumer(consumer.ConsumerId); + consumer.Close(); + } + + throw; + } + + return consumer; + } + + private void DestroyConsumer() + { + if(consumer == null) + { + return; + } + + try + { + consumer.Close(); + consumer = null; + } + catch(NMSException e) + { + Tracer.Debug(e.StackTrace.ToString()); + } + } + + public IEnumerator GetEnumerator() + { + CheckClosed(); + + if(this.consumer == null) + { + this.consumer = CreateConsumer(); + } + + return this; + } + + + private void CheckClosed() + { + if(this.closed) + { + throw new IllegalStateException("The Consumer is closed"); + } + } + + public bool MoveNext() + { + while(true) + { + lock(myLock) + { + if(consumer == null) + { + return false; + } + } + + if(consumer.UnconsumedMessageCount > 0) + { + return true; + } + + if(browseDone.Value || !session.Started) + { + DestroyConsumer(); + return false; + } + + WaitForMessage(); + } + } + + public object Current + { + get + { + while(true) + { + lock(myLock) + { + if(consumer == null) + { + return null; + } + } + + try + { + IMessage answer = consumer.ReceiveNoWait(); + + if(answer != null) + { + return answer; + } + } + catch(NMSException) + { + //TODO: Not implemented. + //this.session.Connection.OnClientInternalException(e); + return null; + } + + if(browseDone.Value || !session.Started) + { + DestroyConsumer(); + return null; + } + + WaitForMessage(); + } + } + } + + [MethodImpl(MethodImplOptions.Synchronized)] + public void Close() + { + DestroyConsumer(); + closed = true; + } + + public IQueue Queue + { + get { return (IQueue)destination; } + } + + public string MessageSelector + { + get { return selector; } + } + + protected void WaitForMessage() + { + try + { + lock(semaphore) + { + Monitor.Wait(semaphore, 2000); + } + } + catch(ThreadInterruptedException) + { + Thread.CurrentThread.Interrupt(); + } + } + + protected void NotifyMessageAvailable() + { + lock(semaphore) + { + Monitor.PulseAll(semaphore); + } + } + + public override string ToString() + { + return "QueueBrowser { value=" + consumerId + " }"; + } + + public void Reset() + { + if(consumer != null) + { + DestroyConsumer(); + } + + consumer = CreateConsumer(); + } + + public class BrowsingMessageConsumer : MessageConsumer + { + private QueueBrowser parent; + + public BrowsingMessageConsumer(QueueBrowser parent, Session session, ConsumerId id, ActiveMQDestination destination, + String name, String selector, int prefetch, int maxPendingMessageCount, + bool noLocal, bool browser, bool dispatchAsync) + : base(session, id, destination, name, selector, prefetch, maxPendingMessageCount, noLocal, browser, dispatchAsync) + { + this.parent = parent; + } + + public override void Dispatch(MessageDispatch md) + { + if(md.Message == null) + { + parent.browseDone.Value = true; + } + else + { + base.Dispatch(md); + } + + parent.NotifyMessageAvailable(); + } + } + } +} Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/QueueBrowser.cs ------------------------------------------------------------------------------ svn:eol-style = native Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs?rev=924338&r1=924337&r2=924338&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs Wed Mar 17 15:49:09 2010 @@ -356,29 +356,32 @@ namespace Apache.NMS.ActiveMQ public IMessageProducer CreateProducer(IDestination destination) { - ProducerInfo command = CreateProducerInfo(destination); - ProducerId producerId = command.ProducerId; MessageProducer producer = null; try { - producer = new MessageProducer(this, command); - producers[producerId] = producer; - this.connection.Oneway(command); + ActiveMQDestination dest = null; + if(destination != null) + { + dest = ActiveMQDestination.Transform(destination); + } + + producer = new MessageProducer(this, GetNextProducerId(), dest, this.RequestTimeout); + + this.AddProducer(producer); + this.Connection.Oneway(producer.ProducerInfo); } catch(Exception) { if(producer != null) { + this.RemoveProducer(producer.ProducerId); producer.Close(); } throw; } - // Registered with Connection so it can process Producer Acks. - connection.addProducer(producerId, producer); - return producer; } @@ -394,78 +397,83 @@ namespace Apache.NMS.ActiveMQ public IMessageConsumer CreateConsumer(IDestination destination, string selector, bool noLocal) { - if (destination == null) + if(destination == null) { throw new InvalidDestinationException("Cannot create a Consumer with a Null destination"); } - ConsumerInfo command = CreateConsumerInfo(destination, selector); - command.NoLocal = noLocal; - ConsumerId consumerId = command.ConsumerId; - MessageConsumer consumer = null; + ActiveMQDestination dest = ActiveMQDestination.Transform(destination); + int prefetchSize = this.Connection.PrefetchPolicy.DurableTopicPrefetch; - // Registered with Connection before we register at the broker. - connection.addDispatcher(consumerId, this); + if(dest is ITopic || dest is ITemporaryTopic) + { + prefetchSize = this.connection.PrefetchPolicy.TopicPrefetch; + } + else if(dest is IQueue || dest is ITemporaryQueue) + { + prefetchSize = this.connection.PrefetchPolicy.QueuePrefetch; + } + + MessageConsumer consumer = null; try { - consumer = new MessageConsumer(this, command); - // lets register the consumer first in case we start dispatching messages immediately - consumers[consumerId] = consumer; - this.Connection.SyncRequest(command); + consumer = new MessageConsumer(this, GetNextConsumerId(), dest, null, selector, prefetchSize, + this.connection.PrefetchPolicy.MaximumPendingMessageLimit, + noLocal, false, this.connection.DispatchAsync); - if(this.Started) + this.AddConsumer(consumer); + this.Connection.SyncRequest(consumer.ConsumerInfo); + + if(this.Connection.IsStarted) { consumer.Start(); } - - return consumer; } catch(Exception) { if(consumer != null) { + this.RemoveConsumer(consumer.ConsumerId); consumer.Close(); } throw; } + + return consumer; } public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal) { - if (destination == null) + if(destination == null) { throw new InvalidDestinationException("Cannot create a Consumer with a Null destination"); } - ConsumerInfo command = CreateConsumerInfo(destination, selector); - ConsumerId consumerId = command.ConsumerId; - command.SubscriptionName = name; - command.NoLocal = noLocal; - command.PrefetchSize = this.connection.PrefetchPolicy.DurableTopicPrefetch; + ActiveMQDestination dest = ActiveMQDestination.Transform(destination); MessageConsumer consumer = null; - // Registered with Connection before we register at the broker. - connection.addDispatcher(consumerId, this); - try { - consumer = new MessageConsumer(this, command); - // lets register the consumer first in case we start dispatching messages immediately - consumers[consumerId] = consumer; + consumer = new MessageConsumer(this, GetNextConsumerId(), dest, name, selector, + this.connection.PrefetchPolicy.DurableTopicPrefetch, + this.connection.PrefetchPolicy.MaximumPendingMessageLimit, + noLocal, false, this.connection.DispatchAsync); - if(this.Started) + this.AddConsumer(consumer); + this.Connection.SyncRequest(consumer.ConsumerInfo); + + if(this.Connection.IsStarted) { consumer.Start(); } - - this.connection.SyncRequest(command); } catch(Exception) { if(consumer != null) { + this.RemoveConsumer(consumer.ConsumerId); consumer.Close(); } @@ -486,12 +494,34 @@ namespace Apache.NMS.ActiveMQ public IQueueBrowser CreateBrowser(IQueue queue) { - throw new NotSupportedException("Not Yet Implemented"); + return this.CreateBrowser(queue, null); } public IQueueBrowser CreateBrowser(IQueue queue, string selector) { - throw new NotSupportedException("Not Yet Implemented"); + if(queue == null) + { + throw new InvalidDestinationException("Cannot create a Consumer with a Null destination"); + } + + ActiveMQDestination dest = ActiveMQDestination.Transform(queue); + QueueBrowser browser = null; + + try + { + browser = new QueueBrowser(this, GetNextConsumerId(), dest, selector, this.DispatchAsync); + } + catch(Exception) + { + if(browser != null) + { + browser.Close(); + } + + throw; + } + + return browser; } public IQueue GetQueue(string name) @@ -506,16 +536,12 @@ namespace Apache.NMS.ActiveMQ public ITemporaryQueue CreateTemporaryQueue() { - ActiveMQTempQueue answer = new ActiveMQTempQueue(Connection.CreateTemporaryDestinationName()); - CreateTemporaryDestination(answer); - return answer; + return (ITemporaryQueue)this.connection.CreateTemporaryDestination(false); } public ITemporaryTopic CreateTemporaryTopic() { - ActiveMQTempTopic answer = new ActiveMQTempTopic(Connection.CreateTemporaryDestinationName()); - CreateTemporaryDestination(answer); - return answer; + return (ITemporaryTopic)this.connection.CreateTemporaryDestination(true); } /// @@ -523,12 +549,7 @@ namespace Apache.NMS.ActiveMQ /// public void DeleteDestination(IDestination destination) { - DestinationInfo command = new DestinationInfo(); - command.ConnectionId = Connection.ConnectionId; - command.OperationType = DestinationInfo.REMOVE_OPERATION_TYPE; // 1 is remove - command.Destination = (ActiveMQDestination) destination; - - this.connection.Oneway(command); + this.connection.DeleteDestination(destination); } public IMessage CreateMessage() @@ -604,16 +625,6 @@ namespace Apache.NMS.ActiveMQ #endregion - protected void CreateTemporaryDestination(ActiveMQDestination tempDestination) - { - DestinationInfo command = new DestinationInfo(); - command.ConnectionId = Connection.ConnectionId; - command.OperationType = DestinationInfo.ADD_OPERATION_TYPE; // 0 is add - command.Destination = tempDestination; - - this.connection.SyncRequest(command); - } - public void DoSend( ActiveMQMessage message, MessageProducer producer, MemoryUsage producerWindow, TimeSpan sendTimeout ) { ActiveMQMessage msg = message; @@ -673,18 +684,33 @@ namespace Apache.NMS.ActiveMQ } } - public void DisposeOf(ConsumerId objectId, long lastDeliveredSequenceId) + public void AddConsumer(MessageConsumer consumer) { - connection.removeDispatcher(objectId); - this.lastDeliveredSequenceId = Math.Min(this.lastDeliveredSequenceId, lastDeliveredSequenceId); + ConsumerId id = consumer.ConsumerId; + // Registered with Connection before we register at the broker. + consumers[id] = consumer; + connection.addDispatcher(id, this); + } + + public void RemoveConsumer(ConsumerId objectId) + { + connection.removeDispatcher(objectId); if(!this.closing) { consumers.Remove(objectId); } } - public void DisposeOf(ProducerId objectId) + public void AddProducer(MessageProducer producer) + { + ProducerId id = producer.ProducerId; + + this.producers[id] = producer; + this.connection.addProducer(id, producer); + } + + public void RemoveProducer(ProducerId objectId) { connection.removeProducer(objectId); if(!this.closing) @@ -693,62 +719,24 @@ namespace Apache.NMS.ActiveMQ } } - protected virtual ConsumerInfo CreateConsumerInfo(IDestination destination, string selector) + public ConsumerId GetNextConsumerId() { - ConsumerInfo answer = new ConsumerInfo(); ConsumerId id = new ConsumerId(); id.ConnectionId = info.SessionId.ConnectionId; id.SessionId = info.SessionId.Value; id.Value = Interlocked.Increment(ref consumerCounter); - answer.ConsumerId = id; - answer.Destination = ActiveMQDestination.Transform(destination); - answer.Selector = selector; - answer.Priority = this.Priority; - answer.Exclusive = this.Exclusive; - answer.DispatchAsync = this.DispatchAsync; - answer.Retroactive = this.Retroactive; - answer.MaximumPendingMessageLimit = this.connection.PrefetchPolicy.MaximumPendingMessageLimit; - - if(destination is ITopic || destination is ITemporaryTopic) - { - answer.PrefetchSize = this.connection.PrefetchPolicy.TopicPrefetch; - } - else if(destination is IQueue || destination is ITemporaryQueue) - { - answer.PrefetchSize = this.connection.PrefetchPolicy.QueuePrefetch; - } - - // If the destination contained a URI query, then use it to set public properties - // on the ConsumerInfo - ActiveMQDestination amqDestination = destination as ActiveMQDestination; - if(amqDestination != null && amqDestination.Options != null) - { - URISupport.SetProperties(answer, amqDestination.Options, "consumer."); - } - return answer; + return id; } - protected virtual ProducerInfo CreateProducerInfo(IDestination destination) + public ProducerId GetNextProducerId() { - ProducerInfo answer = new ProducerInfo(); ProducerId id = new ProducerId(); id.ConnectionId = info.SessionId.ConnectionId; id.SessionId = info.SessionId.Value; id.Value = Interlocked.Increment(ref producerCounter); - answer.ProducerId = id; - answer.Destination = ActiveMQDestination.Transform(destination); - answer.WindowSize = connection.ProducerWindowSize; - - // If the destination contained a URI query, then use it to set public - // properties on the ProducerInfo - ActiveMQDestination amqDestination = destination as ActiveMQDestination; - if(amqDestination != null && amqDestination.Options != null) - { - URISupport.SetProperties(answer, amqDestination.Options, "producer."); - } - return answer; + return id; } public void Stop() Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/QueueBrowserTests.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/QueueBrowserTests.cs?rev=924338&view=auto ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/QueueBrowserTests.cs (added) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/QueueBrowserTests.cs Wed Mar 17 15:49:09 2010 @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections; +using System.Diagnostics; +using Apache.NMS.ActiveMQ.Commands; +using Apache.NMS.Test; +using NUnit.Framework; + +namespace Apache.NMS.ActiveMQ.Test +{ + [TestFixture] + public class QueueBrowserTests : NMSTestSupport + { + [Test] + public void TestReceiveBrowseReceive() + { + using (IConnection connection = CreateConnection()) + { + using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge)) + { + IDestination destination = session.GetQueue("TestReceiveBrowseReceive"); + IMessageProducer producer = session.CreateProducer(destination); + IMessageConsumer consumer = session.CreateConsumer(destination); + connection.Start(); + + IMessage[] outbound = new IMessage[]{session.CreateTextMessage("First Message"), + session.CreateTextMessage("Second Message"), + session.CreateTextMessage("Third Message")}; + + // lets consume any outstanding messages from previous test runs + while (consumer.Receive(TimeSpan.FromMilliseconds(1000)) != null) + { + } + + producer.Send(outbound[0]); + producer.Send(outbound[1]); + producer.Send(outbound[2]); + + IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(1000)); + + // Get the first. + Assert.AreEqual(((ITextMessage)outbound[0]).Text, ((ITextMessage)msg).Text); + consumer.Close(); + + IQueueBrowser browser = session.CreateBrowser((IQueue)destination); + IEnumerator enumeration = browser.GetEnumerator(); + + // browse the second + Assert.IsTrue(enumeration.MoveNext(), "should have received the second message"); + Assert.AreEqual(((ITextMessage)outbound[1]).Text, ((ITextMessage)enumeration.Current).Text); + + // browse the third. + Assert.IsTrue(enumeration.MoveNext(), "Should have received the third message"); + Assert.AreEqual(((ITextMessage)outbound[2]).Text, ((ITextMessage)enumeration.Current).Text); + + // There should be no more. + bool tooMany = false; + while (enumeration.MoveNext()) + { + Debug.WriteLine("Got extra message: " + ((ITextMessage)enumeration.Current).Text); + tooMany = true; + } + Assert.IsFalse(tooMany); + + //Reset should take us back to the start. + enumeration.Reset(); + + // browse the second + Assert.IsTrue(enumeration.MoveNext(), "should have received the second message"); + Assert.AreEqual(((ITextMessage)outbound[1]).Text, ((ITextMessage)enumeration.Current).Text); + + // browse the third. + Assert.IsTrue(enumeration.MoveNext(), "Should have received the third message"); + Assert.AreEqual(((ITextMessage)outbound[2]).Text, ((ITextMessage)enumeration.Current).Text); + + // There should be no more. + tooMany = false; + while (enumeration.MoveNext()) + { + Debug.WriteLine("Got extra message: " + ((ITextMessage)enumeration.Current).Text); + tooMany = true; + } + Assert.IsFalse(tooMany); + + browser.Close(); + + // Re-open the consumer. + consumer = session.CreateConsumer(destination); + + // Receive the second. + Assert.AreEqual(((ITextMessage)outbound[1]).Text, ((ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000))).Text); + // Receive the third. + Assert.AreEqual(((ITextMessage)outbound[2]).Text, ((ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000))).Text); + consumer.Close(); + } + } + } + + [Test] + public void TestBrowseReceive() + { + using (IConnection connection = CreateConnection()) + { + using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge)) + { + IDestination destination = session.GetQueue("TestBrowseReceive"); + + connection.Start(); + + using(IMessageConsumer purger = session.CreateConsumer(destination)) + { + // lets consume any outstanding messages from previous test runs + while(purger.Receive(TimeSpan.FromMilliseconds(1000)) != null) + { + } + + purger.Close(); + } + + IMessage[] outbound = new IMessage[]{session.CreateTextMessage("First Message"), + session.CreateTextMessage("Second Message"), + session.CreateTextMessage("Third Message")}; + + IMessageProducer producer = session.CreateProducer(destination); + producer.Send(outbound[0]); + + // create browser first + IQueueBrowser browser = session.CreateBrowser((IQueue)destination); + IEnumerator enumeration = browser.GetEnumerator(); + + // create consumer + IMessageConsumer consumer = session.CreateConsumer(destination); + + // browse the first message + Assert.IsTrue(enumeration.MoveNext(), "should have received the first message"); + Assert.AreEqual(((ITextMessage)outbound[0]).Text, ((ITextMessage)enumeration.Current).Text); + + // Receive the first message. + Assert.AreEqual(((ITextMessage)outbound[0]).Text, ((ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000))).Text); + consumer.Close(); + browser.Close(); + producer.Close(); + } + } + } + } +} Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/QueueBrowserTests.cs ------------------------------------------------------------------------------ svn:eol-style = native