Author: tabish
Date: Wed Mar 17 15:49:09 2010
New Revision: 924338
URL: http://svn.apache.org/viewvc?rev=924338&view=rev
Log:
http://issues.apache.org/activemq/browse/AMQNET-206
Add QueueBrowser implementation based on supplied patch. Changes where needed to better integrate into the codebase.
Added:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/QueueBrowser.cs (with props)
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/QueueBrowserTests.cs (with props)
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=924338&r1=924337&r2=924338&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs Wed Mar 17 15:49:09 2010
@@ -769,15 +769,15 @@ namespace Apache.NMS.ActiveMQ
foreach(Session session in this.sessions)
{
- try
- {
- session.ClearMessagesInProgress();
- }
- catch(Exception ex)
- {
- Tracer.Warn("Exception while clearing messages: " + ex.Message);
- Tracer.Warn(ex.StackTrace);
- }
+ try
+ {
+ session.ClearMessagesInProgress();
+ }
+ catch(Exception ex)
+ {
+ Tracer.Warn("Exception while clearing messages: " + ex.Message);
+ Tracer.Warn(ex.StackTrace);
+ }
}
if(this.ConnectionInterruptedListener != null && !this.closing )
@@ -824,14 +824,6 @@ namespace Apache.NMS.ActiveMQ
}
///
- /// Creates a new temporary destination name
- ///
- public String CreateTemporaryDestinationName()
- {
- return info.ConnectionId.Value + ":" + Interlocked.Increment(ref temporaryDestinationCounter);
- }
-
- ///
/// Creates a new local transaction ID
///
public LocalTransactionId CreateLocalTransactionId()
@@ -852,6 +844,37 @@ namespace Apache.NMS.ActiveMQ
return answer;
}
+ public ActiveMQTempDestination CreateTemporaryDestination(bool topic)
+ {
+ ActiveMQTempDestination destination = null;
+
+ if(topic)
+ {
+ destination = new ActiveMQTempTopic(
+ info.ConnectionId.Value + ":" + Interlocked.Increment(ref temporaryDestinationCounter));
+ }
+ else
+ {
+ destination = new ActiveMQTempQueue(
+ info.ConnectionId.Value + ":" + Interlocked.Increment(ref temporaryDestinationCounter));
+ }
+
+ DestinationInfo command = new DestinationInfo();
+ command.ConnectionId = ConnectionId;
+ command.OperationType = DestinationInfo.ADD_OPERATION_TYPE; // 0 is add
+ command.Destination = destination;
+
+ this.SyncRequest(command);
+
+ destination.Connection = this;
+
+ return destination;
+ }
+
+ protected void CreateTemporaryDestination(ActiveMQDestination tempDestination)
+ {
+ }
+
public void DeleteTemporaryDestination(IDestination destination)
{
this.DeleteDestination(destination);
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=924338&r1=924337&r2=924338&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs Wed Mar 17 15:49:09 2010
@@ -66,11 +66,54 @@ namespace Apache.NMS.ActiveMQ
private IRedeliveryPolicy redeliveryPolicy;
// Constructor internal to prevent clients from creating an instance.
- internal MessageConsumer(Session session, ConsumerInfo info)
- {
+ internal MessageConsumer(Session session, ConsumerId id, ActiveMQDestination destination,
+ String name, String selector, int prefetch, int maxPendingMessageCount,
+ bool noLocal, bool browser, bool dispatchAsync )
+ {
+ if(destination == null)
+ {
+ throw new InvalidDestinationException("Consumer cannot receive on Null Destinations.");
+ }
+
this.session = session;
- this.info = info;
- this.redeliveryPolicy = this.session.Connection.RedeliveryPolicy;
+ this.redeliveryPolicy = this.session.Connection.RedeliveryPolicy;
+
+ this.info = new ConsumerInfo();
+ this.info.ConsumerId = id;
+ this.info.Destination = destination;
+ this.info.SubscriptionName = name;
+ this.info.Selector = selector;
+ this.info.PrefetchSize = prefetch;
+ this.info.MaximumPendingMessageLimit = maxPendingMessageCount;
+ this.info.NoLocal = noLocal;
+ this.info.Browser = browser;
+ this.info.DispatchAsync = dispatchAsync;
+ this.info.Retroactive = session.Retroactive;
+ this.info.Exclusive = session.Exclusive;
+ this.info.Priority = session.Priority;
+
+ // If the destination contained a URI query, then use it to set public properties
+ // on the ConsumerInfo
+ if(destination.Options != null)
+ {
+ URISupport.SetProperties(this.info, destination.Options, "consumer.");
+ }
+
+// try
+// {
+// this.session.AddConsumer(this);
+// this.session.Connection.SyncRequest(this.info);
+//
+// if(this.session.Connection.IsStarted)
+// {
+// this.Start();
+// }
+// }
+// catch(Exception)
+// {
+// this.session.RemoveConsumer(this.info.ConsumerId);
+// throw;
+// }
}
~MessageConsumer()
@@ -87,9 +130,14 @@ namespace Apache.NMS.ActiveMQ
public ConsumerId ConsumerId
{
- get { return info.ConsumerId; }
+ get { return this.info.ConsumerId; }
}
+ public ConsumerInfo ConsumerInfo
+ {
+ get { return this.info; }
+ }
+
public int RedeliveryTimeout
{
get { return redeliveryTimeout; }
@@ -106,6 +154,11 @@ namespace Apache.NMS.ActiveMQ
get { return this.redeliveryPolicy; }
set { this.redeliveryPolicy = value; }
}
+
+ public long UnconsumedMessageCount
+ {
+ get { return this.unconsumedMessages.Count; }
+ }
#endregion
@@ -286,7 +339,7 @@ namespace Apache.NMS.ActiveMQ
}
this.unconsumedMessages.Close();
- this.session.DisposeOf(this.info.ConsumerId, this.lastDeliveredSequenceId);
+ this.session.RemoveConsumer(this.info.ConsumerId);
RemoveInfo removeCommand = new RemoveInfo();
removeCommand.ObjectId = this.info.ConsumerId;
@@ -444,7 +497,7 @@ namespace Apache.NMS.ActiveMQ
}
}
- public void Dispatch(MessageDispatch dispatch)
+ public virtual void Dispatch(MessageDispatch dispatch)
{
MessageListener listener = this.listener;
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs?rev=924338&r1=924337&r2=924338&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs Wed Mar 17 15:49:09 2010
@@ -43,17 +43,28 @@ namespace Apache.NMS.ActiveMQ
private bool disableMessageTimestamp = false;
protected bool disposed = false;
- public MessageProducer(Session session, ProducerInfo info)
+ public MessageProducer(Session session, ProducerId id, ActiveMQDestination destination, TimeSpan requestTimeout)
{
- this.session = session;
- this.info = info;
- this.RequestTimeout = session.RequestTimeout;
+ this.session = session;
+ this.RequestTimeout = requestTimeout;
+ this.info = new ProducerInfo();
+ this.info.ProducerId = id;
+ this.info.Destination = destination;
+ this.info.WindowSize = session.Connection.ProducerWindowSize;
+
+ // If the destination contained a URI query, then use it to set public
+ // properties on the ProducerInfo
+ if(destination != null && destination.Options != null)
+ {
+ URISupport.SetProperties(this.info, destination.Options, "producer.");
+ }
+
// Version Three and higher will send us a ProducerAck, but only if we
// have a set producer window size.
- if( session.Connection.ProtocolVersion >= 3 && info.WindowSize > 0 )
+ if(session.Connection.ProtocolVersion >= 3 && this.info.WindowSize > 0)
{
- usage = new MemoryUsage( info.WindowSize );
+ this.usage = new MemoryUsage(this.info.WindowSize);
}
}
@@ -120,7 +131,7 @@ namespace Apache.NMS.ActiveMQ
try
{
- session.DisposeOf(info.ProducerId);
+ session.RemoveProducer(info.ProducerId);
}
catch(Exception ex)
{
@@ -231,6 +242,11 @@ namespace Apache.NMS.ActiveMQ
get { return info.ProducerId; }
}
+ public ProducerInfo ProducerInfo
+ {
+ get { return info; }
+ }
+
public MsgDeliveryMode DeliveryMode
{
get { return msgDeliveryMode; }
Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/QueueBrowser.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/QueueBrowser.cs?rev=924338&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/QueueBrowser.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/QueueBrowser.cs Wed Mar 17 15:49:09 2010
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections;
+using System.Collections.Generic;
+using System.Text;
+using Apache.NMS.ActiveMQ.Commands;
+using System.Threading;
+using Apache.NMS.Util;
+using System.Runtime.CompilerServices;
+using System.Diagnostics;
+
+namespace Apache.NMS.ActiveMQ
+{
+ public class QueueBrowser : IQueueBrowser, IEnumerator
+ {
+ private readonly Session session;
+ private readonly ActiveMQDestination destination;
+ private readonly string selector;
+
+ private MessageConsumer consumer;
+ private bool closed;
+ private readonly ConsumerId consumerId;
+ private readonly Atomic browseDone = new Atomic(true);
+ private readonly bool dispatchAsync;
+ private object semaphore = new object();
+ private object myLock = new object();
+
+ internal QueueBrowser(Session session, ConsumerId consumerId, ActiveMQDestination destination, string selector, bool dispatchAsync)
+ {
+ this.session = session;
+ this.consumerId = consumerId;
+ this.destination = destination;
+ this.selector = selector;
+ this.dispatchAsync = dispatchAsync;
+ this.consumer = CreateConsumer();
+ }
+
+ private MessageConsumer CreateConsumer()
+ {
+ this.browseDone.Value = false;
+ BrowsingMessageConsumer consumer = null;
+
+ try
+ {
+ consumer = new BrowsingMessageConsumer(
+ this, session, this.consumerId, this.destination, null, this.selector,
+ this.session.Connection.PrefetchPolicy.QueueBrowserPrefetch,
+ this.session.Connection.PrefetchPolicy.MaximumPendingMessageLimit,
+ false, true, this.dispatchAsync);
+
+ this.session.AddConsumer(consumer);
+ this.session.Connection.SyncRequest(consumer.ConsumerInfo);
+
+ if(this.session.Connection.IsStarted)
+ {
+ consumer.Start();
+ }
+ }
+ catch(Exception)
+ {
+ if(consumer != null)
+ {
+ this.session.RemoveConsumer(consumer.ConsumerId);
+ consumer.Close();
+ }
+
+ throw;
+ }
+
+ return consumer;
+ }
+
+ private void DestroyConsumer()
+ {
+ if(consumer == null)
+ {
+ return;
+ }
+
+ try
+ {
+ consumer.Close();
+ consumer = null;
+ }
+ catch(NMSException e)
+ {
+ Tracer.Debug(e.StackTrace.ToString());
+ }
+ }
+
+ public IEnumerator GetEnumerator()
+ {
+ CheckClosed();
+
+ if(this.consumer == null)
+ {
+ this.consumer = CreateConsumer();
+ }
+
+ return this;
+ }
+
+
+ private void CheckClosed()
+ {
+ if(this.closed)
+ {
+ throw new IllegalStateException("The Consumer is closed");
+ }
+ }
+
+ public bool MoveNext()
+ {
+ while(true)
+ {
+ lock(myLock)
+ {
+ if(consumer == null)
+ {
+ return false;
+ }
+ }
+
+ if(consumer.UnconsumedMessageCount > 0)
+ {
+ return true;
+ }
+
+ if(browseDone.Value || !session.Started)
+ {
+ DestroyConsumer();
+ return false;
+ }
+
+ WaitForMessage();
+ }
+ }
+
+ public object Current
+ {
+ get
+ {
+ while(true)
+ {
+ lock(myLock)
+ {
+ if(consumer == null)
+ {
+ return null;
+ }
+ }
+
+ try
+ {
+ IMessage answer = consumer.ReceiveNoWait();
+
+ if(answer != null)
+ {
+ return answer;
+ }
+ }
+ catch(NMSException)
+ {
+ //TODO: Not implemented.
+ //this.session.Connection.OnClientInternalException(e);
+ return null;
+ }
+
+ if(browseDone.Value || !session.Started)
+ {
+ DestroyConsumer();
+ return null;
+ }
+
+ WaitForMessage();
+ }
+ }
+ }
+
+ [MethodImpl(MethodImplOptions.Synchronized)]
+ public void Close()
+ {
+ DestroyConsumer();
+ closed = true;
+ }
+
+ public IQueue Queue
+ {
+ get { return (IQueue)destination; }
+ }
+
+ public string MessageSelector
+ {
+ get { return selector; }
+ }
+
+ protected void WaitForMessage()
+ {
+ try
+ {
+ lock(semaphore)
+ {
+ Monitor.Wait(semaphore, 2000);
+ }
+ }
+ catch(ThreadInterruptedException)
+ {
+ Thread.CurrentThread.Interrupt();
+ }
+ }
+
+ protected void NotifyMessageAvailable()
+ {
+ lock(semaphore)
+ {
+ Monitor.PulseAll(semaphore);
+ }
+ }
+
+ public override string ToString()
+ {
+ return "QueueBrowser { value=" + consumerId + " }";
+ }
+
+ public void Reset()
+ {
+ if(consumer != null)
+ {
+ DestroyConsumer();
+ }
+
+ consumer = CreateConsumer();
+ }
+
+ public class BrowsingMessageConsumer : MessageConsumer
+ {
+ private QueueBrowser parent;
+
+ public BrowsingMessageConsumer(QueueBrowser parent, Session session, ConsumerId id, ActiveMQDestination destination,
+ String name, String selector, int prefetch, int maxPendingMessageCount,
+ bool noLocal, bool browser, bool dispatchAsync)
+ : base(session, id, destination, name, selector, prefetch, maxPendingMessageCount, noLocal, browser, dispatchAsync)
+ {
+ this.parent = parent;
+ }
+
+ public override void Dispatch(MessageDispatch md)
+ {
+ if(md.Message == null)
+ {
+ parent.browseDone.Value = true;
+ }
+ else
+ {
+ base.Dispatch(md);
+ }
+
+ parent.NotifyMessageAvailable();
+ }
+ }
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/QueueBrowser.cs
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs?rev=924338&r1=924337&r2=924338&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs Wed Mar 17 15:49:09 2010
@@ -356,29 +356,32 @@ namespace Apache.NMS.ActiveMQ
public IMessageProducer CreateProducer(IDestination destination)
{
- ProducerInfo command = CreateProducerInfo(destination);
- ProducerId producerId = command.ProducerId;
MessageProducer producer = null;
try
{
- producer = new MessageProducer(this, command);
- producers[producerId] = producer;
- this.connection.Oneway(command);
+ ActiveMQDestination dest = null;
+ if(destination != null)
+ {
+ dest = ActiveMQDestination.Transform(destination);
+ }
+
+ producer = new MessageProducer(this, GetNextProducerId(), dest, this.RequestTimeout);
+
+ this.AddProducer(producer);
+ this.Connection.Oneway(producer.ProducerInfo);
}
catch(Exception)
{
if(producer != null)
{
+ this.RemoveProducer(producer.ProducerId);
producer.Close();
}
throw;
}
- // Registered with Connection so it can process Producer Acks.
- connection.addProducer(producerId, producer);
-
return producer;
}
@@ -394,78 +397,83 @@ namespace Apache.NMS.ActiveMQ
public IMessageConsumer CreateConsumer(IDestination destination, string selector, bool noLocal)
{
- if (destination == null)
+ if(destination == null)
{
throw new InvalidDestinationException("Cannot create a Consumer with a Null destination");
}
- ConsumerInfo command = CreateConsumerInfo(destination, selector);
- command.NoLocal = noLocal;
- ConsumerId consumerId = command.ConsumerId;
- MessageConsumer consumer = null;
+ ActiveMQDestination dest = ActiveMQDestination.Transform(destination);
+ int prefetchSize = this.Connection.PrefetchPolicy.DurableTopicPrefetch;
- // Registered with Connection before we register at the broker.
- connection.addDispatcher(consumerId, this);
+ if(dest is ITopic || dest is ITemporaryTopic)
+ {
+ prefetchSize = this.connection.PrefetchPolicy.TopicPrefetch;
+ }
+ else if(dest is IQueue || dest is ITemporaryQueue)
+ {
+ prefetchSize = this.connection.PrefetchPolicy.QueuePrefetch;
+ }
+
+ MessageConsumer consumer = null;
try
{
- consumer = new MessageConsumer(this, command);
- // lets register the consumer first in case we start dispatching messages immediately
- consumers[consumerId] = consumer;
- this.Connection.SyncRequest(command);
+ consumer = new MessageConsumer(this, GetNextConsumerId(), dest, null, selector, prefetchSize,
+ this.connection.PrefetchPolicy.MaximumPendingMessageLimit,
+ noLocal, false, this.connection.DispatchAsync);
- if(this.Started)
+ this.AddConsumer(consumer);
+ this.Connection.SyncRequest(consumer.ConsumerInfo);
+
+ if(this.Connection.IsStarted)
{
consumer.Start();
}
-
- return consumer;
}
catch(Exception)
{
if(consumer != null)
{
+ this.RemoveConsumer(consumer.ConsumerId);
consumer.Close();
}
throw;
}
+
+ return consumer;
}
public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal)
{
- if (destination == null)
+ if(destination == null)
{
throw new InvalidDestinationException("Cannot create a Consumer with a Null destination");
}
- ConsumerInfo command = CreateConsumerInfo(destination, selector);
- ConsumerId consumerId = command.ConsumerId;
- command.SubscriptionName = name;
- command.NoLocal = noLocal;
- command.PrefetchSize = this.connection.PrefetchPolicy.DurableTopicPrefetch;
+ ActiveMQDestination dest = ActiveMQDestination.Transform(destination);
MessageConsumer consumer = null;
- // Registered with Connection before we register at the broker.
- connection.addDispatcher(consumerId, this);
-
try
{
- consumer = new MessageConsumer(this, command);
- // lets register the consumer first in case we start dispatching messages immediately
- consumers[consumerId] = consumer;
+ consumer = new MessageConsumer(this, GetNextConsumerId(), dest, name, selector,
+ this.connection.PrefetchPolicy.DurableTopicPrefetch,
+ this.connection.PrefetchPolicy.MaximumPendingMessageLimit,
+ noLocal, false, this.connection.DispatchAsync);
- if(this.Started)
+ this.AddConsumer(consumer);
+ this.Connection.SyncRequest(consumer.ConsumerInfo);
+
+ if(this.Connection.IsStarted)
{
consumer.Start();
}
-
- this.connection.SyncRequest(command);
}
catch(Exception)
{
if(consumer != null)
{
+ this.RemoveConsumer(consumer.ConsumerId);
consumer.Close();
}
@@ -486,12 +494,34 @@ namespace Apache.NMS.ActiveMQ
public IQueueBrowser CreateBrowser(IQueue queue)
{
- throw new NotSupportedException("Not Yet Implemented");
+ return this.CreateBrowser(queue, null);
}
public IQueueBrowser CreateBrowser(IQueue queue, string selector)
{
- throw new NotSupportedException("Not Yet Implemented");
+ if(queue == null)
+ {
+ throw new InvalidDestinationException("Cannot create a Consumer with a Null destination");
+ }
+
+ ActiveMQDestination dest = ActiveMQDestination.Transform(queue);
+ QueueBrowser browser = null;
+
+ try
+ {
+ browser = new QueueBrowser(this, GetNextConsumerId(), dest, selector, this.DispatchAsync);
+ }
+ catch(Exception)
+ {
+ if(browser != null)
+ {
+ browser.Close();
+ }
+
+ throw;
+ }
+
+ return browser;
}
public IQueue GetQueue(string name)
@@ -506,16 +536,12 @@ namespace Apache.NMS.ActiveMQ
public ITemporaryQueue CreateTemporaryQueue()
{
- ActiveMQTempQueue answer = new ActiveMQTempQueue(Connection.CreateTemporaryDestinationName());
- CreateTemporaryDestination(answer);
- return answer;
+ return (ITemporaryQueue)this.connection.CreateTemporaryDestination(false);
}
public ITemporaryTopic CreateTemporaryTopic()
{
- ActiveMQTempTopic answer = new ActiveMQTempTopic(Connection.CreateTemporaryDestinationName());
- CreateTemporaryDestination(answer);
- return answer;
+ return (ITemporaryTopic)this.connection.CreateTemporaryDestination(true);
}
///
@@ -523,12 +549,7 @@ namespace Apache.NMS.ActiveMQ
///
public void DeleteDestination(IDestination destination)
{
- DestinationInfo command = new DestinationInfo();
- command.ConnectionId = Connection.ConnectionId;
- command.OperationType = DestinationInfo.REMOVE_OPERATION_TYPE; // 1 is remove
- command.Destination = (ActiveMQDestination) destination;
-
- this.connection.Oneway(command);
+ this.connection.DeleteDestination(destination);
}
public IMessage CreateMessage()
@@ -604,16 +625,6 @@ namespace Apache.NMS.ActiveMQ
#endregion
- protected void CreateTemporaryDestination(ActiveMQDestination tempDestination)
- {
- DestinationInfo command = new DestinationInfo();
- command.ConnectionId = Connection.ConnectionId;
- command.OperationType = DestinationInfo.ADD_OPERATION_TYPE; // 0 is add
- command.Destination = tempDestination;
-
- this.connection.SyncRequest(command);
- }
-
public void DoSend( ActiveMQMessage message, MessageProducer producer, MemoryUsage producerWindow, TimeSpan sendTimeout )
{
ActiveMQMessage msg = message;
@@ -673,18 +684,33 @@ namespace Apache.NMS.ActiveMQ
}
}
- public void DisposeOf(ConsumerId objectId, long lastDeliveredSequenceId)
+ public void AddConsumer(MessageConsumer consumer)
{
- connection.removeDispatcher(objectId);
- this.lastDeliveredSequenceId = Math.Min(this.lastDeliveredSequenceId, lastDeliveredSequenceId);
+ ConsumerId id = consumer.ConsumerId;
+ // Registered with Connection before we register at the broker.
+ consumers[id] = consumer;
+ connection.addDispatcher(id, this);
+ }
+
+ public void RemoveConsumer(ConsumerId objectId)
+ {
+ connection.removeDispatcher(objectId);
if(!this.closing)
{
consumers.Remove(objectId);
}
}
- public void DisposeOf(ProducerId objectId)
+ public void AddProducer(MessageProducer producer)
+ {
+ ProducerId id = producer.ProducerId;
+
+ this.producers[id] = producer;
+ this.connection.addProducer(id, producer);
+ }
+
+ public void RemoveProducer(ProducerId objectId)
{
connection.removeProducer(objectId);
if(!this.closing)
@@ -693,62 +719,24 @@ namespace Apache.NMS.ActiveMQ
}
}
- protected virtual ConsumerInfo CreateConsumerInfo(IDestination destination, string selector)
+ public ConsumerId GetNextConsumerId()
{
- ConsumerInfo answer = new ConsumerInfo();
ConsumerId id = new ConsumerId();
id.ConnectionId = info.SessionId.ConnectionId;
id.SessionId = info.SessionId.Value;
id.Value = Interlocked.Increment(ref consumerCounter);
- answer.ConsumerId = id;
- answer.Destination = ActiveMQDestination.Transform(destination);
- answer.Selector = selector;
- answer.Priority = this.Priority;
- answer.Exclusive = this.Exclusive;
- answer.DispatchAsync = this.DispatchAsync;
- answer.Retroactive = this.Retroactive;
- answer.MaximumPendingMessageLimit = this.connection.PrefetchPolicy.MaximumPendingMessageLimit;
-
- if(destination is ITopic || destination is ITemporaryTopic)
- {
- answer.PrefetchSize = this.connection.PrefetchPolicy.TopicPrefetch;
- }
- else if(destination is IQueue || destination is ITemporaryQueue)
- {
- answer.PrefetchSize = this.connection.PrefetchPolicy.QueuePrefetch;
- }
-
- // If the destination contained a URI query, then use it to set public properties
- // on the ConsumerInfo
- ActiveMQDestination amqDestination = destination as ActiveMQDestination;
- if(amqDestination != null && amqDestination.Options != null)
- {
- URISupport.SetProperties(answer, amqDestination.Options, "consumer.");
- }
- return answer;
+ return id;
}
- protected virtual ProducerInfo CreateProducerInfo(IDestination destination)
+ public ProducerId GetNextProducerId()
{
- ProducerInfo answer = new ProducerInfo();
ProducerId id = new ProducerId();
id.ConnectionId = info.SessionId.ConnectionId;
id.SessionId = info.SessionId.Value;
id.Value = Interlocked.Increment(ref producerCounter);
- answer.ProducerId = id;
- answer.Destination = ActiveMQDestination.Transform(destination);
- answer.WindowSize = connection.ProducerWindowSize;
-
- // If the destination contained a URI query, then use it to set public
- // properties on the ProducerInfo
- ActiveMQDestination amqDestination = destination as ActiveMQDestination;
- if(amqDestination != null && amqDestination.Options != null)
- {
- URISupport.SetProperties(answer, amqDestination.Options, "producer.");
- }
- return answer;
+ return id;
}
public void Stop()
Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/QueueBrowserTests.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/QueueBrowserTests.cs?rev=924338&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/QueueBrowserTests.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/QueueBrowserTests.cs Wed Mar 17 15:49:09 2010
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections;
+using System.Diagnostics;
+using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.Test;
+using NUnit.Framework;
+
+namespace Apache.NMS.ActiveMQ.Test
+{
+ [TestFixture]
+ public class QueueBrowserTests : NMSTestSupport
+ {
+ [Test]
+ public void TestReceiveBrowseReceive()
+ {
+ using (IConnection connection = CreateConnection())
+ {
+ using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+ {
+ IDestination destination = session.GetQueue("TestReceiveBrowseReceive");
+ IMessageProducer producer = session.CreateProducer(destination);
+ IMessageConsumer consumer = session.CreateConsumer(destination);
+ connection.Start();
+
+ IMessage[] outbound = new IMessage[]{session.CreateTextMessage("First Message"),
+ session.CreateTextMessage("Second Message"),
+ session.CreateTextMessage("Third Message")};
+
+ // lets consume any outstanding messages from previous test runs
+ while (consumer.Receive(TimeSpan.FromMilliseconds(1000)) != null)
+ {
+ }
+
+ producer.Send(outbound[0]);
+ producer.Send(outbound[1]);
+ producer.Send(outbound[2]);
+
+ IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(1000));
+
+ // Get the first.
+ Assert.AreEqual(((ITextMessage)outbound[0]).Text, ((ITextMessage)msg).Text);
+ consumer.Close();
+
+ IQueueBrowser browser = session.CreateBrowser((IQueue)destination);
+ IEnumerator enumeration = browser.GetEnumerator();
+
+ // browse the second
+ Assert.IsTrue(enumeration.MoveNext(), "should have received the second message");
+ Assert.AreEqual(((ITextMessage)outbound[1]).Text, ((ITextMessage)enumeration.Current).Text);
+
+ // browse the third.
+ Assert.IsTrue(enumeration.MoveNext(), "Should have received the third message");
+ Assert.AreEqual(((ITextMessage)outbound[2]).Text, ((ITextMessage)enumeration.Current).Text);
+
+ // There should be no more.
+ bool tooMany = false;
+ while (enumeration.MoveNext())
+ {
+ Debug.WriteLine("Got extra message: " + ((ITextMessage)enumeration.Current).Text);
+ tooMany = true;
+ }
+ Assert.IsFalse(tooMany);
+
+ //Reset should take us back to the start.
+ enumeration.Reset();
+
+ // browse the second
+ Assert.IsTrue(enumeration.MoveNext(), "should have received the second message");
+ Assert.AreEqual(((ITextMessage)outbound[1]).Text, ((ITextMessage)enumeration.Current).Text);
+
+ // browse the third.
+ Assert.IsTrue(enumeration.MoveNext(), "Should have received the third message");
+ Assert.AreEqual(((ITextMessage)outbound[2]).Text, ((ITextMessage)enumeration.Current).Text);
+
+ // There should be no more.
+ tooMany = false;
+ while (enumeration.MoveNext())
+ {
+ Debug.WriteLine("Got extra message: " + ((ITextMessage)enumeration.Current).Text);
+ tooMany = true;
+ }
+ Assert.IsFalse(tooMany);
+
+ browser.Close();
+
+ // Re-open the consumer.
+ consumer = session.CreateConsumer(destination);
+
+ // Receive the second.
+ Assert.AreEqual(((ITextMessage)outbound[1]).Text, ((ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000))).Text);
+ // Receive the third.
+ Assert.AreEqual(((ITextMessage)outbound[2]).Text, ((ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000))).Text);
+ consumer.Close();
+ }
+ }
+ }
+
+ [Test]
+ public void TestBrowseReceive()
+ {
+ using (IConnection connection = CreateConnection())
+ {
+ using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
+ {
+ IDestination destination = session.GetQueue("TestBrowseReceive");
+
+ connection.Start();
+
+ using(IMessageConsumer purger = session.CreateConsumer(destination))
+ {
+ // lets consume any outstanding messages from previous test runs
+ while(purger.Receive(TimeSpan.FromMilliseconds(1000)) != null)
+ {
+ }
+
+ purger.Close();
+ }
+
+ IMessage[] outbound = new IMessage[]{session.CreateTextMessage("First Message"),
+ session.CreateTextMessage("Second Message"),
+ session.CreateTextMessage("Third Message")};
+
+ IMessageProducer producer = session.CreateProducer(destination);
+ producer.Send(outbound[0]);
+
+ // create browser first
+ IQueueBrowser browser = session.CreateBrowser((IQueue)destination);
+ IEnumerator enumeration = browser.GetEnumerator();
+
+ // create consumer
+ IMessageConsumer consumer = session.CreateConsumer(destination);
+
+ // browse the first message
+ Assert.IsTrue(enumeration.MoveNext(), "should have received the first message");
+ Assert.AreEqual(((ITextMessage)outbound[0]).Text, ((ITextMessage)enumeration.Current).Text);
+
+ // Receive the first message.
+ Assert.AreEqual(((ITextMessage)outbound[0]).Text, ((ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000))).Text);
+ consumer.Close();
+ browser.Close();
+ producer.Close();
+ }
+ }
+ }
+ }
+}
Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/QueueBrowserTests.cs
------------------------------------------------------------------------------
svn:eol-style = native