Return-Path: Delivered-To: apmail-geronimo-activemq-commits-archive@www.apache.org Received: (qmail 84915 invoked from network); 18 Oct 2006 15:14:45 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (209.237.227.199) by minotaur.apache.org with SMTP; 18 Oct 2006 15:14:45 -0000 Received: (qmail 66483 invoked by uid 500); 18 Oct 2006 15:14:45 -0000 Delivered-To: apmail-geronimo-activemq-commits-archive@geronimo.apache.org Received: (qmail 66456 invoked by uid 500); 18 Oct 2006 15:14:45 -0000 Mailing-List: contact activemq-commits-help@geronimo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: activemq-dev@geronimo.apache.org Delivered-To: mailing list activemq-commits@geronimo.apache.org Received: (qmail 66446 invoked by uid 99); 18 Oct 2006 15:14:45 -0000 Received: from asf.osuosl.org (HELO asf.osuosl.org) (140.211.166.49) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Oct 2006 08:14:45 -0700 X-ASF-Spam-Status: No, hits=-8.6 required=10.0 tests=ALL_TRUSTED,INFO_TLD,NO_REAL_NAME X-Spam-Check-By: apache.org Received-SPF: pass (asf.osuosl.org: local policy) Received: from [140.211.166.113] (HELO eris.apache.org) (140.211.166.113) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Oct 2006 08:14:43 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id 68F091A981A; Wed, 18 Oct 2006 08:14:23 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r465270 - in /incubator/activemq/activemq-dotnet/trunk/src/main/csharp: ActiveMQ/Session.cs NMS/ISession.cs Date: Wed, 18 Oct 2006 15:14:22 -0000 To: activemq-commits@geronimo.apache.org From: jstrachan@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20061018151423.68F091A981A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org X-Spam-Rating: minotaur.apache.org 1.6.2 0/1000/N Author: jstrachan Date: Wed Oct 18 08:14:17 2006 New Revision: 465270 URL: http://svn.apache.org/viewvc?view=rev&rev=465270 Log: removed ActiveMQ-specific properties from the ISession API to keep it nice and clean; folks can use the ActiveMQ Session class if they want to explicitly configure ActiveMQ-speciifc things Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Session.cs incubator/activemq/activemq-dotnet/trunk/src/main/csharp/NMS/ISession.cs Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Session.cs URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Session.cs?view=diff&rev=465270&r1=465269&r2=465270 ============================================================================== --- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Session.cs (original) +++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/ActiveMQ/Session.cs Wed Oct 18 08:14:17 2006 @@ -19,390 +19,411 @@ using System; using System.Collections; -namespace ActiveMQ -{ - /// - /// Default provider of ISession - /// - public class Session : ISession - { - private Connection connection; - private SessionInfo info; - private AcknowledgementMode acknowledgementMode; - private long consumerCounter; - private long producerCounter; - private int prefetchSize = 1000; - private int maximumPendingMessageLimit; - private byte priority; - private bool dispatchAsync; - private bool exclusive; - private bool retroactive; - private IDictionary consumers = Hashtable.Synchronized(new Hashtable()); - private TransactionContext transactionContext; - - public Session(Connection connection, SessionInfo info, AcknowledgementMode acknowledgementMode) - { - this.connection = connection; - this.info = info; - this.acknowledgementMode = acknowledgementMode; - transactionContext = new TransactionContext(this); - } - - public int PrefetchSize +namespace ActiveMQ { + /// + /// Default provider of ISession + /// + public class Session : ISession { - get { return prefetchSize; } - set { this.prefetchSize = value; } - } + private Connection connection; + private SessionInfo info; + private AcknowledgementMode acknowledgementMode; + private long consumerCounter; + private long producerCounter; + private int prefetchSize = 1000; + private int maximumPendingMessageLimit; + private byte priority; + private bool dispatchAsync; + private bool exclusive; + private bool retroactive; + private IDictionary consumers = Hashtable.Synchronized(new Hashtable()); + private TransactionContext transactionContext; - public int MaximumPendingMessageLimit - { - get { return maximumPendingMessageLimit; } - set { this.maximumPendingMessageLimit = value; } - } + public Session(Connection connection, SessionInfo info, AcknowledgementMode acknowledgementMode) + { + this.connection = connection; + this.info = info; + this.acknowledgementMode = acknowledgementMode; + transactionContext = new TransactionContext(this); + } - public bool DispatchAsync - { - get { return dispatchAsync; } - set { this.dispatchAsync = value; } - } - public bool Exclusive - { - get { return exclusive; } - set { this.exclusive = value; } - } + /// + /// Sets the prefetch size, the maximum number of messages a broker will dispatch to consumers + /// until acknowledgements are received. + /// + public int PrefetchSize { + get { return prefetchSize; } + set { this.prefetchSize = value; } + } - public bool Retroactive - { - get { return retroactive; } - set { this.retroactive = value; } - } + /// + /// Sets the maximum number of messages to keep around per consumer + /// in addition to the prefetch window for non-durable topics until messages + /// will start to be evicted for slow consumers. + /// Must be > 0 to enable this feature + /// + public int MaximumPendingMessageLimit { + get { return maximumPendingMessageLimit; } + set { this.maximumPendingMessageLimit = value; } + } - public byte Priority - { - get { return priority; } - set { this.priority = value; } - } - - public void Dispose() - { - connection.DisposeOf(info.SessionId); - } - - public IMessageProducer CreateProducer() - { - return CreateProducer(null); - } - - public IMessageProducer CreateProducer(IDestination destination) - { - ProducerInfo command = CreateProducerInfo(destination); - connection.SyncRequest(command); - return new MessageProducer(this, command); - } - - - - public IMessageConsumer CreateConsumer(IDestination destination) - { - return CreateConsumer(destination, null); - } - - public IMessageConsumer CreateConsumer(IDestination destination, string selector) - { - ConsumerInfo command = CreateConsumerInfo(destination, selector); - ConsumerId consumerId = command.ConsumerId; - - try - { - MessageConsumer consumer = new MessageConsumer(this, command, acknowledgementMode); - // lets register the consumer first in case we start dispatching messages immediately - connection.AddConsumer(consumerId, consumer); - - connection.SyncRequest(command); - - consumers[consumerId] = consumer; - return consumer; - } - catch (Exception e) - { - connection.RemoveConsumer(consumerId); - throw e; - } - } - - public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal) - { - ConsumerInfo command = CreateConsumerInfo(destination, selector); - ConsumerId consumerId = command.ConsumerId; - command.SubcriptionName = name; - command.NoLocal = noLocal; - - try - { - MessageConsumer consumer = new MessageConsumer(this, command, acknowledgementMode); - // lets register the consumer first in case we start dispatching messages immediately - connection.AddConsumer(consumerId, consumer); - - connection.SyncRequest(command); - - consumers[consumerId] = consumer; - return consumer; - } - catch (Exception e) - { - connection.RemoveConsumer(consumerId); - throw e; - } - } - - public IQueue GetQueue(string name) - { - return new ActiveMQQueue(name); - } - - public ITopic GetTopic(string name) - { - return new ActiveMQTopic(name); - } - - public ITemporaryQueue CreateTemporaryQueue() - { - ActiveMQTempQueue answer = new ActiveMQTempQueue(connection.CreateTemporaryDestinationName()); - CreateTemporaryDestination(answer); - return answer; - } - - public ITemporaryTopic CreateTemporaryTopic() - { - ActiveMQTempTopic answer = new ActiveMQTempTopic(connection.CreateTemporaryDestinationName()); - CreateTemporaryDestination(answer); - return answer; - } + /// + /// Enables or disables whether asynchronous dispatch should be used by the broker + /// + public bool DispatchAsync { + get { return dispatchAsync; } + set { this.dispatchAsync = value; } + } - protected void CreateTemporaryDestination(ActiveMQDestination tempDestination) - { - DestinationInfo command = new DestinationInfo(); - command.ConnectionId = connection.ConnectionId; - command.OperationType = 0; // 0 is add - command.Destination = tempDestination; + /// + /// Enables or disables exclusive consumers when using queues. An exclusive consumer means + /// only one instance of a consumer is allowed to process messages on a queue to preserve order + /// + public bool Exclusive { + get { return exclusive; } + set { this.exclusive = value; } + } - connection.SyncRequest(command); - } - - protected void DestroyTemporaryDestination(ActiveMQDestination tempDestination) - { - DestinationInfo command = new DestinationInfo(); - command.ConnectionId = connection.ConnectionId; - command.OperationType = 1; // 1 is remove - command.Destination = tempDestination; + /// + /// Enables or disables retroactive mode for consumers; i.e. do they go back in time or not? + /// + public bool Retroactive { + get { return retroactive; } + set { this.retroactive = value; } + } - connection.SyncRequest(command); - } - - - public IMessage CreateMessage() - { - ActiveMQMessage answer = new ActiveMQMessage(); - Configure(answer); - return answer; - } - - - public ITextMessage CreateTextMessage() - { - ActiveMQTextMessage answer = new ActiveMQTextMessage(); - Configure(answer); - return answer; - } - - public ITextMessage CreateTextMessage(string text) - { - ActiveMQTextMessage answer = new ActiveMQTextMessage(text); - Configure(answer); - return answer; - } - - public IMapMessage CreateMapMessage() - { - return new ActiveMQMapMessage(); - } - - public IBytesMessage CreateBytesMessage() - { - return new ActiveMQBytesMessage(); - } - - public IBytesMessage CreateBytesMessage(byte[] body) - { - ActiveMQBytesMessage answer = new ActiveMQBytesMessage(); - answer.Content = body; - return answer; - } - - public void Commit() - { - if (! Transacted) - { - throw new InvalidOperationException("You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: " + acknowledgementMode); - } - transactionContext.Commit(); - } - - public void Rollback() - { - if (! Transacted) - { - throw new InvalidOperationException("You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: " + acknowledgementMode); - } - transactionContext.Rollback(); - - // lets ensure all the consumers redeliver any rolled back messages - foreach (MessageConsumer consumer in GetConsumers()) - { - consumer.RedeliverRolledBackMessages(); - } - } - - - - // Properties - - public Connection Connection - { - get { return connection; } - } - - public SessionId SessionId - { - get { return info.SessionId; } - } - - public bool Transacted - { - get { return acknowledgementMode == AcknowledgementMode.Transactional; } - } - - public TransactionContext TransactionContext - { - get { return transactionContext; } - } - - // Implementation methods - public void DoSend(IDestination destination, IMessage message) - { - ActiveMQMessage command = ActiveMQMessage.Transform(message); - // TODO complete packet - connection.SyncRequest(command); - } - - /// - /// Ensures that a transaction is started - /// - public void DoStartTransaction() - { - if (Transacted) - { - transactionContext.Begin(); - } - } - - public void DisposeOf(ConsumerId objectId) - { - consumers.Remove(objectId); - connection.RemoveConsumer(objectId); - connection.DisposeOf(objectId); - } - - public void DispatchAsyncMessages(object state) - { - // lets iterate through each consumer created by this session - // ensuring that they have all pending messages dispatched - lock (this) - { - // lets ensure that only 1 thread dispatches messages in a consumer at once - - foreach (MessageConsumer consumer in GetConsumers()) + /// + /// Sets the default consumer priority for consumers + /// + public byte Priority { + get { return priority; } + set { this.priority = value; } + } + + public void Dispose() { - consumer.DispatchAsyncMessages(); + connection.DisposeOf(info.SessionId); } - } - } + public IMessageProducer CreateProducer() + { + return CreateProducer(null); + } - /// - /// Returns a copy of the current consumers in a thread safe way to avoid concurrency - /// problems if the consumers are changed in another thread - /// - protected ICollection GetConsumers() - { - lock (consumers.SyncRoot) - { - return new ArrayList(consumers.Values); - } - } - - protected virtual ConsumerInfo CreateConsumerInfo(IDestination destination, string selector) - { - ConsumerInfo answer = new ConsumerInfo(); - ConsumerId id = new ConsumerId(); - id.ConnectionId = info.SessionId.ConnectionId; - id.SessionId = info.SessionId.Value; - lock (this) - { - id.Value = ++consumerCounter; - } - answer.ConsumerId = id; - answer.Destination = ActiveMQDestination.Transform(destination); - answer.Selector = selector; - answer.PrefetchSize = prefetchSize; - answer.Priority = priority; - answer.Exclusive = exclusive; - answer.DispatchAsync = dispatchAsync; - answer.Retroactive = retroactive; - - // If the destination contained a URI query, then use it to set public properties - // on the ConsumerInfo - ActiveMQDestination amqDestination = destination as ActiveMQDestination; - if (amqDestination != null && amqDestination.Options != null) - { - Util.URISupport.SetProperties(answer, amqDestination.Options, "consumer."); - } - - return answer; - } - - protected virtual ProducerInfo CreateProducerInfo(IDestination destination) - { - ProducerInfo answer = new ProducerInfo(); - ProducerId id = new ProducerId(); - id.ConnectionId = info.SessionId.ConnectionId; - id.SessionId = info.SessionId.Value; - lock (this) - { - id.Value = ++producerCounter; - } - answer.ProducerId = id; - answer.Destination = ActiveMQDestination.Transform(destination); - - answer.Destination = ActiveMQDestination.Transform(destination); - - // If the destination contained a URI query, then use it to set public - // properties on the ProducerInfo - ActiveMQDestination amqDestination = destination as ActiveMQDestination; - if (amqDestination != null && amqDestination.Options != null) - { - Util.URISupport.SetProperties(answer, amqDestination.Options, "producer."); - } + public IMessageProducer CreateProducer(IDestination destination) + { + ProducerInfo command = CreateProducerInfo(destination); + connection.SyncRequest(command); + return new MessageProducer(this, command); + } - return answer; - } - - /// - /// Configures the message command - /// - protected void Configure(ActiveMQMessage message) - { + + + public IMessageConsumer CreateConsumer(IDestination destination) + { + return CreateConsumer(destination, null); + } + + public IMessageConsumer CreateConsumer(IDestination destination, string selector) + { + ConsumerInfo command = CreateConsumerInfo(destination, selector); + ConsumerId consumerId = command.ConsumerId; + + try + { + MessageConsumer consumer = new MessageConsumer(this, command, acknowledgementMode); + // lets register the consumer first in case we start dispatching messages immediately + connection.AddConsumer(consumerId, consumer); + + connection.SyncRequest(command); + + consumers[consumerId] = consumer; + return consumer; + } + catch (Exception e) + { + connection.RemoveConsumer(consumerId); + throw e; + } + } + + public IMessageConsumer CreateDurableConsumer( + ITopic destination, + string name, + string selector, + bool noLocal) + { + ConsumerInfo command = CreateConsumerInfo(destination, selector); + ConsumerId consumerId = command.ConsumerId; + command.SubcriptionName = name; + command.NoLocal = noLocal; + + try + { + MessageConsumer consumer = new MessageConsumer(this, command, acknowledgementMode); + // lets register the consumer first in case we start dispatching messages immediately + connection.AddConsumer(consumerId, consumer); + + connection.SyncRequest(command); + + consumers[consumerId] = consumer; + return consumer; + } + catch (Exception e) + { + connection.RemoveConsumer(consumerId); + throw e; + } + } + + public IQueue GetQueue(string name) + { + return new ActiveMQQueue(name); + } + + public ITopic GetTopic(string name) + { + return new ActiveMQTopic(name); + } + + public ITemporaryQueue CreateTemporaryQueue() + { + ActiveMQTempQueue answer = new ActiveMQTempQueue(connection.CreateTemporaryDestinationName()); + CreateTemporaryDestination(answer); + return answer; + } + + public ITemporaryTopic CreateTemporaryTopic() + { + ActiveMQTempTopic answer = new ActiveMQTempTopic(connection.CreateTemporaryDestinationName()); + CreateTemporaryDestination(answer); + return answer; + } + + protected void CreateTemporaryDestination(ActiveMQDestination tempDestination) + { + DestinationInfo command = new DestinationInfo(); + command.ConnectionId = connection.ConnectionId; + command.OperationType = 0; // 0 is add + command.Destination = tempDestination; + + connection.SyncRequest(command); + } + + protected void DestroyTemporaryDestination(ActiveMQDestination tempDestination) + { + DestinationInfo command = new DestinationInfo(); + command.ConnectionId = connection.ConnectionId; + command.OperationType = 1; // 1 is remove + command.Destination = tempDestination; + + connection.SyncRequest(command); + } + + + public IMessage CreateMessage() + { + ActiveMQMessage answer = new ActiveMQMessage(); + Configure(answer); + return answer; + } + + + public ITextMessage CreateTextMessage() + { + ActiveMQTextMessage answer = new ActiveMQTextMessage(); + Configure(answer); + return answer; + } + + public ITextMessage CreateTextMessage(string text) + { + ActiveMQTextMessage answer = new ActiveMQTextMessage(text); + Configure(answer); + return answer; + } + + public IMapMessage CreateMapMessage() + { + return new ActiveMQMapMessage(); + } + + public IBytesMessage CreateBytesMessage() + { + return new ActiveMQBytesMessage(); + } + + public IBytesMessage CreateBytesMessage(byte[] body) + { + ActiveMQBytesMessage answer = new ActiveMQBytesMessage(); + answer.Content = body; + return answer; + } + + public void Commit() + { + if (!Transacted) + { + throw new InvalidOperationException( + "You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: " + + acknowledgementMode); + } + transactionContext.Commit(); + } + + public void Rollback() + { + if (!Transacted) + { + throw new InvalidOperationException( + "You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: " + + acknowledgementMode); + } + transactionContext.Rollback(); + + // lets ensure all the consumers redeliver any rolled back messages + foreach (MessageConsumer consumer in GetConsumers()) + { + consumer.RedeliverRolledBackMessages(); + } + } + + + + // Properties + + public Connection Connection { + get { return connection; } + } + + public SessionId SessionId { + get { return info.SessionId; } + } + + public bool Transacted { + get { return acknowledgementMode == AcknowledgementMode.Transactional; } + } + + public TransactionContext TransactionContext { + get { return transactionContext; } + } + + // Implementation methods + public void DoSend(IDestination destination, IMessage message) + { + ActiveMQMessage command = ActiveMQMessage.Transform(message); + // TODO complete packet + connection.SyncRequest(command); + } + + /// + /// Ensures that a transaction is started + /// + public void DoStartTransaction() + { + if (Transacted) + { + transactionContext.Begin(); + } + } + + public void DisposeOf(ConsumerId objectId) + { + consumers.Remove(objectId); + connection.RemoveConsumer(objectId); + connection.DisposeOf(objectId); + } + + public void DispatchAsyncMessages(object state) + { + // lets iterate through each consumer created by this session + // ensuring that they have all pending messages dispatched + lock (this) + { + // lets ensure that only 1 thread dispatches messages in a consumer at once + + foreach (MessageConsumer consumer in GetConsumers()) + { + consumer.DispatchAsyncMessages(); + } + } + } + + + /// + /// Returns a copy of the current consumers in a thread safe way to avoid concurrency + /// problems if the consumers are changed in another thread + /// + protected ICollection GetConsumers() + { + lock (consumers.SyncRoot) + { + return new ArrayList(consumers.Values); + } + } + + protected virtual ConsumerInfo CreateConsumerInfo(IDestination destination, string selector) + { + ConsumerInfo answer = new ConsumerInfo(); + ConsumerId id = new ConsumerId(); + id.ConnectionId = info.SessionId.ConnectionId; + id.SessionId = info.SessionId.Value; + lock (this) + { + id.Value = ++consumerCounter; + } + answer.ConsumerId = id; + answer.Destination = ActiveMQDestination.Transform(destination); + answer.Selector = selector; + answer.PrefetchSize = prefetchSize; + answer.Priority = priority; + answer.Exclusive = exclusive; + answer.DispatchAsync = dispatchAsync; + answer.Retroactive = retroactive; + + // If the destination contained a URI query, then use it to set public properties + // on the ConsumerInfo + ActiveMQDestination amqDestination = destination as ActiveMQDestination; + if (amqDestination != null && amqDestination.Options != null) + { + Util.URISupport.SetProperties(answer, amqDestination.Options, "consumer."); + } + + return answer; + } + + protected virtual ProducerInfo CreateProducerInfo(IDestination destination) + { + ProducerInfo answer = new ProducerInfo(); + ProducerId id = new ProducerId(); + id.ConnectionId = info.SessionId.ConnectionId; + id.SessionId = info.SessionId.Value; + lock (this) + { + id.Value = ++producerCounter; + } + answer.ProducerId = id; + answer.Destination = ActiveMQDestination.Transform(destination); + + answer.Destination = ActiveMQDestination.Transform(destination); + + // If the destination contained a URI query, then use it to set public + // properties on the ProducerInfo + ActiveMQDestination amqDestination = destination as ActiveMQDestination; + if (amqDestination != null && amqDestination.Options != null) + { + Util.URISupport.SetProperties(answer, amqDestination.Options, "producer."); + } + + return answer; + } + + /// + /// Configures the message command + /// + protected void Configure(ActiveMQMessage message) + { + } } - } } Modified: incubator/activemq/activemq-dotnet/trunk/src/main/csharp/NMS/ISession.cs URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-dotnet/trunk/src/main/csharp/NMS/ISession.cs?view=diff&rev=465270&r1=465269&r2=465270 ============================================================================== --- incubator/activemq/activemq-dotnet/trunk/src/main/csharp/NMS/ISession.cs (original) +++ incubator/activemq/activemq-dotnet/trunk/src/main/csharp/NMS/ISession.cs Wed Oct 18 08:14:17 2006 @@ -116,34 +116,5 @@ /// void Rollback(); - - - - /// - /// Sets the prefetch size, the maximum number of messages a broker will dispatch to consumers - /// until acknowledgements are received. - /// - int PrefetchSize { get; set; } - - /// - /// Enables or disables whether asynchronous dispatch should be used by the broker - /// - bool DispatchAsync { get; set; } - - /// - /// Enables or disables exclusive consumers when using queues. An exclusive consumer means - /// only one instance of a consumer is allowed to process messages on a queue to preserve order - /// - bool Exclusive { get; set; } - - /// - /// Enables or disables retroactive mode for consumers; i.e. do they go back in time or not? - /// - bool Retroactive { get; set; } - - /// - /// Sets the default consumer priority for consumers - /// - byte Priority { get; set; } } }