Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 86291 invoked from network); 24 Oct 2009 15:24:28 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 24 Oct 2009 15:24:28 -0000 Received: (qmail 79403 invoked by uid 500); 24 Oct 2009 15:24:28 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 79346 invoked by uid 500); 24 Oct 2009 15:24:28 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 79337 invoked by uid 99); 24 Oct 2009 15:24:28 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 24 Oct 2009 15:24:28 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 24 Oct 2009 15:24:17 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id CF9AA23888D4; Sat, 24 Oct 2009 15:23:54 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r829386 [1/2] - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src: main/csharp/ main/csharp/Commands/ main/csharp/Threads/ main/csharp/Transport/Failover/ main/csharp/Transport/Mock/ main/csharp/Util/ test/csharp/Threads/ Date: Sat, 24 Oct 2009 15:23:54 -0000 To: commits@activemq.apache.org From: tabish@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20091024152354.CF9AA23888D4@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: tabish Date: Sat Oct 24 15:23:52 2009 New Revision: 829386 URL: http://svn.apache.org/viewvc?rev=829386&view=rev Log: https://issues.apache.org/activemq/browse/AMQNET-176 Refactored Session and Consumer dispatching code with new Message acknowledgment handling to deal with transactions correctly and optimize the overall dispatch / ack cycle. Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/SessionExecutor.cs (with props) activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/DedicatedTaskRunner.cs (with props) activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Threads/ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Threads/DedicatedTaskRunnerTest.cs (with props) Removed: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Dispatcher.cs activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/DispatchingThread.cs Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/BaseCommand.cs activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ISynchronization.cs activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/DefaultThreadPools.cs activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/PooledTaskRunner.cs activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/Task.cs activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunner.cs activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunnerFactory.cs activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/MessageDispatchChannel.cs Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/BaseCommand.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/BaseCommand.cs?rev=829386&r1=829385&r2=829386&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/BaseCommand.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/BaseCommand.cs Sat Oct 24 15:23:52 2009 @@ -33,7 +33,7 @@ public abstract class BaseCommand : BaseDataStructure, Command, ICloneable { private int commandId; - private bool responseRequired; + private bool responseRequired = false; public int CommandId { Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=829386&r1=829385&r2=829386&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs Sat Oct 24 15:23:52 2009 @@ -38,6 +38,7 @@ private WireFormatInfo brokerWireFormatInfo; // from broker private readonly IList sessions = ArrayList.Synchronized(new ArrayList()); private readonly IDictionary producers = Hashtable.Synchronized(new Hashtable()); + private readonly IDictionary dispatchers = Hashtable.Synchronized(new Hashtable()); private readonly object myLock = new object(); private bool asyncSend = false; private bool alwaysSyncSend = false; @@ -183,6 +184,57 @@ get { return this.metaData ?? (this.metaData = new ConnectionMetaData()); } } + public Uri BrokerUri + { + get { return brokerUri; } + } + + public ITransport ITransport + { + get { return transport; } + set { this.transport = value; } + } + + public TimeSpan RequestTimeout + { + get { return this.requestTimeout; } + set { this.requestTimeout = value; } + } + + public AcknowledgementMode AcknowledgementMode + { + get { return acknowledgementMode; } + set { this.acknowledgementMode = value; } + } + + public string ClientId + { + get { return info.ClientId; } + set + { + if(connected) + { + throw new NMSException("You cannot change the ClientId once the Connection is connected"); + } + info.ClientId = value; + } + } + + public ConnectionId ConnectionId + { + get { return info.ConnectionId; } + } + + public BrokerInfo BrokerInfo + { + get { return brokerInfo; } + } + + public WireFormatInfo BrokerWireFormat + { + get { return brokerWireFormatInfo; } + } + #endregion /// @@ -198,7 +250,7 @@ { foreach(Session session in sessions) { - session.StartAsyncDelivery(); + session.Start(); } } } @@ -226,7 +278,7 @@ { foreach(Session session in sessions) { - session.StopAsyncDelivery(); + session.Stop(); } } } @@ -255,7 +307,7 @@ if(IsStarted) { - session.StartAsyncDelivery(); + session.Start(); } sessions.Add(session); @@ -264,14 +316,22 @@ public void RemoveSession(Session session) { - DisposeOf(session.SessionId); - if(!this.closing) { sessions.Remove(session); } } + public void addDispatcher( ConsumerId id, IDispatcher dispatcher ) + { + this.dispatchers.Add( id, dispatcher ); + } + + public void removeDispatcher( ConsumerId id ) + { + this.dispatchers.Remove( id ); + } + public void addProducer( ProducerId id, MessageProducer producer ) { this.producers.Add( id, producer ); @@ -299,7 +359,7 @@ { foreach(Session session in sessions) { - session.Close(); + session.DoClose(); } } sessions.Clear(); @@ -308,7 +368,6 @@ { DisposeOf(ConnectionId); ShutdownInfo shutdowninfo = new ShutdownInfo(); - shutdowninfo.ResponseRequired = false; transport.Oneway(shutdowninfo); } @@ -362,59 +421,6 @@ disposed = true; } - // Properties - - public Uri BrokerUri - { - get { return brokerUri; } - } - - public ITransport ITransport - { - get { return transport; } - set { this.transport = value; } - } - - public TimeSpan RequestTimeout - { - get { return this.requestTimeout; } - set { this.requestTimeout = value; } - } - - public AcknowledgementMode AcknowledgementMode - { - get { return acknowledgementMode; } - set { this.acknowledgementMode = value; } - } - - public string ClientId - { - get { return info.ClientId; } - set - { - if(connected) - { - throw new NMSException("You cannot change the ClientId once the Connection is connected"); - } - info.ClientId = value; - } - } - - public ConnectionId ConnectionId - { - get { return info.ConnectionId; } - } - - public BrokerInfo BrokerInfo - { - get { return brokerInfo; } - } - - public WireFormatInfo BrokerWireFormat - { - get { return brokerWireFormatInfo; } - } - // Implementation methods /// @@ -469,7 +475,7 @@ } } - public void DisposeOf(DataStructure objectId) + private void DisposeOf(DataStructure objectId) { try { @@ -480,7 +486,6 @@ Tracer.Info("Asynchronously disposing of Connection."); if(connected) { - command.ResponseRequired = false; transport.Oneway(command); } } @@ -499,25 +504,6 @@ } } - /// - /// Creates a new temporary destination name - /// - public String CreateTemporaryDestinationName() - { - return info.ConnectionId.Value + ":" + Interlocked.Increment(ref temporaryDestinationCounter); - } - - /// - /// Creates a new local transaction ID - /// - public LocalTransactionId CreateLocalTransactionId() - { - LocalTransactionId id = new LocalTransactionId(); - id.ConnectionId = ConnectionId; - id.Value = Interlocked.Increment(ref localTransactionCounter); - return id; - } - protected void CheckConnected() { if(closed) @@ -607,33 +593,23 @@ protected void DispatchMessage(MessageDispatch dispatch) { - bool dispatched = false; - - // Override the Message's Destination with the one from the Dispatch since in the - // case of a virtual Topic the correct destination ack is the one from the Dispatch. - // This is a bit of a hack since we should really be sending the entire dispatch to - // the Consumer. - dispatch.Message.Destination = dispatch.Destination; - dispatch.Message.ReadOnlyBody = true; - dispatch.Message.ReadOnlyProperties = true; - dispatch.Message.RedeliveryCounter = dispatch.RedeliveryCounter; - - lock(sessions.SyncRoot) + lock(dispatchers.SyncRoot) { - foreach(Session session in sessions) + if(dispatchers.Contains(dispatch.ConsumerId)) { - if(session.DispatchMessage(dispatch.ConsumerId, dispatch.Message)) - { - dispatched = true; - break; - } + IDispatcher dispatcher = (IDispatcher) dispatchers[dispatch.ConsumerId]; + + dispatch.Message.ReadOnlyBody = true; + dispatch.Message.ReadOnlyProperties = true; + dispatch.Message.RedeliveryCounter = dispatch.RedeliveryCounter; + + dispatcher.Dispatch(dispatch); + + return; } } - if(!dispatched) - { - Tracer.Error("No such consumer active: " + dispatch.ConsumerId); - } + Tracer.Error("No such consumer active: " + dispatch.ConsumerId); } protected void OnKeepAliveCommand(ITransport commandTransport, KeepAliveInfo info) @@ -677,6 +653,11 @@ { Tracer.Debug("Transport has been Interrupted."); + foreach(Session session in this.sessions) + { + session.ClearMessagesInProgress(); + } + if(this.ConnectionInterruptedListener != null && !this.closing ) { try @@ -720,6 +701,25 @@ } } + /// + /// Creates a new temporary destination name + /// + public String CreateTemporaryDestinationName() + { + return info.ConnectionId.Value + ":" + Interlocked.Increment(ref temporaryDestinationCounter); + } + + /// + /// Creates a new local transaction ID + /// + public LocalTransactionId CreateLocalTransactionId() + { + LocalTransactionId id = new LocalTransactionId(); + id.ConnectionId = ConnectionId; + id.Value = Interlocked.Increment(ref localTransactionCounter); + return id; + } + protected SessionInfo CreateSessionInfo(AcknowledgementMode sessionAcknowledgementMode) { SessionInfo answer = new SessionInfo(); Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ISynchronization.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ISynchronization.cs?rev=829386&r1=829385&r2=829386&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ISynchronization.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ISynchronization.cs Sat Oct 24 15:23:52 2009 @@ -20,9 +20,9 @@ public interface ISynchronization { /// - /// Called before a commit + /// Called before a commit or rollback is applied. /// - void BeforeCommit(); + void BeforeEnd(); /// /// Called after a commit Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=829386&r1=829385&r2=829386&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs Sat Oct 24 15:23:52 2009 @@ -15,6 +15,9 @@ * limitations under the License. */ using System; +using System.Threading; +using System.Collections; +using System.Collections.Generic; using Apache.NMS.ActiveMQ.Commands; using Apache.NMS; using Apache.NMS.Util; @@ -25,37 +28,50 @@ { DeliveredAck = 0, // Message delivered but not consumed PoisonAck = 1, // Message could not be processed due to poison pill but discard anyway - ConsumedAck = 2 // Message consumed, discard + ConsumedAck = 2, // Message consumed, discard + RedeliveredAck = 3, // Message has been Redelivered and is not yet poisoned. + IndividualAck = 4 // Only the given message is to be treated as consumed. } - /// /// An object capable of receiving messages from some destination /// - public class MessageConsumer : IMessageConsumer + public class MessageConsumer : IMessageConsumer, IDispatcher { - private readonly AcknowledgementMode acknowledgementMode; - private bool closed = false; - private object closedLock = new object(); - private readonly Dispatcher dispatcher = new Dispatcher(); - private readonly ConsumerInfo info; + private object closedLock = new object(); + + private readonly MessageDispatchChannel unconsumedMessages = new MessageDispatchChannel(); + private readonly LinkedList dispatchedMessages = new LinkedList(); + private readonly ConsumerInfo info; + private Session session; + + private MessageAck pendingAck = null; + + private Atomic started = new Atomic(); + private Atomic deliveringAcks = new Atomic(); + + private bool closed = false; private int maximumRedeliveryCount = 10; private int redeliveryTimeout = 500; - private Session session; - private Session ackSession; protected bool disposed = false; + private long lastDeliveredSequenceId = 0; + private int deliveredCounter = 0; + private int additionalWindowSize = 0; + private long redeliveryDelay = 0; + private int dispatchedCount = 0; + private volatile bool synchronizationRegistered = false; + private bool clearDispatchList = false; + + private const int DEFAULT_REDELIVERY_DELAY = 0; + private const int DEFAULT_MAX_REDELIVERIES = 5; + private event MessageListener listener; + // Constructor internal to prevent clients from creating an instance. - internal MessageConsumer(Session session, ConsumerInfo info, - AcknowledgementMode acknowledgementMode) + internal MessageConsumer(Session session, ConsumerInfo info) { this.session = session; this.info = info; - this.acknowledgementMode = acknowledgementMode; - if(AcknowledgementMode.AutoAcknowledge == acknowledgementMode) - { - this.ackSession = (Session) session.Connection.CreateSession(acknowledgementMode); - } } ~MessageConsumer() @@ -63,11 +79,13 @@ Dispose(false); } - internal Dispatcher Dispatcher - { - get { return this.dispatcher; } - } + #region Property Accessors + public long LastDeliveredSequenceId + { + get{ return this.lastDeliveredSequenceId; } + } + public ConsumerId ConsumerId { get { return info.ConsumerId; } @@ -85,35 +103,103 @@ set { redeliveryTimeout = value; } } + public int PrefetchSize + { + get { return this.info.PrefetchSize; } + } + + #endregion + #region IMessageConsumer Members public event MessageListener Listener { add { + CheckClosed(); + + if(this.PrefetchSize == 0) + { + throw new NMSException("Cannot set Asynchronous Listener on a Consumer with a zero Prefetch size"); + } + + bool wasStarted = this.session.Started; + + if(wasStarted == true) + { + this.session.Stop(); + } + listener += value; - session.RegisterConsumerDispatcher(dispatcher); + this.session.Redispatch(this.unconsumedMessages); + + if(wasStarted == true) + { + this.session.Start(); + } } remove { listener -= value; } } - public IMessage Receive() { - SendPullRequest(0); - return SetupAcknowledge(dispatcher.Dequeue()); - } - - public IMessage Receive(System.TimeSpan timeout) - { - SendPullRequest((long) timeout.TotalMilliseconds); - return SetupAcknowledge(dispatcher.Dequeue(timeout)); + CheckClosed(); + CheckMessageListener(); + + SendPullRequest(0); + MessageDispatch dispatch = this.Dequeue(TimeSpan.FromMilliseconds(-1)); + + if(dispatch == null) + { + return null; + } + + BeforeMessageIsConsumed(dispatch); + AfterMessageIsConsumed(dispatch, false); + + return CreateActiveMQMessage(dispatch); + } + + public IMessage Receive(TimeSpan timeout) + { + CheckClosed(); + CheckMessageListener(); + + SendPullRequest((long)timeout.TotalMilliseconds); + MessageDispatch dispatch = this.Dequeue(timeout); + + if(dispatch == null) + { + return null; + } + + Tracer.Debug("Receive got new MessageDispatch: " + dispatch); + + BeforeMessageIsConsumed(dispatch); + AfterMessageIsConsumed(dispatch, false); + + Tracer.Debug("Creating new message and returning."); + + return CreateActiveMQMessage(dispatch); } public IMessage ReceiveNoWait() { - SendPullRequest(-1); - return SetupAcknowledge(dispatcher.DequeueNoWait()); + CheckClosed(); + CheckMessageListener(); + + SendPullRequest(-1); + MessageDispatch dispatch = this.Dequeue(TimeSpan.Zero); + + if(dispatch == null) + { + return null; + } + + BeforeMessageIsConsumed(dispatch); + AfterMessageIsConsumed(dispatch, false); + + return CreateActiveMQMessage(dispatch); } public void Dispose() @@ -136,8 +222,8 @@ try { - Close(); - } + Close(); + } catch { // Ignore network errors. @@ -148,117 +234,62 @@ public void Close() { - lock(closedLock) - { - if(closed) - { - return; - } - - try - { - // wake up any pending dequeue() call on the dispatcher - dispatcher.Close(); - session.DisposeOf(info.ConsumerId); - - if(ackSession != null) - { - ackSession.Close(); - } - } - catch(Exception ex) - { - Tracer.ErrorFormat("Error during consumer close: {0}", ex); - } - - session = null; - ackSession = null; - closed = true; - } - } + if(!this.unconsumedMessages.Closed) + { + if(this.session.IsTransacted && this.session.TransactionContext.InTransaction) + { + this.session.TransactionContext.AddSynchronization(new ConsumerCloseSynchronization(this)); + } + else + { + this.DoClose(); + } + } + } + + internal void DoClose() + { + lock(this.closedLock) + { + if(!this.unconsumedMessages.Closed) + { + // Do we have any acks we need to send out before closing? + // Ack any delivered messages now. + if(!this.session.IsTransacted) + { + DeliverAcks(); + if(this.IsAutoAcknowledgeBatch) + { + Acknowledge(); + } + } + + if(!this.session.IsTransacted) + { + lock(this.dispatchedMessages) + { + dispatchedMessages.Clear(); + } + } + + this.unconsumedMessages.Close(); + this.session.DisposeOf(this.info.ConsumerId, this.lastDeliveredSequenceId); + + RemoveInfo removeCommand = new RemoveInfo(); + removeCommand.ObjectId = this.info.ConsumerId; + removeCommand.LastDeliveredSequenceId = this.lastDeliveredSequenceId; + + this.session.Connection.Oneway(removeCommand); + this.session = null; + } + } + } #endregion - private event MessageListener listener; - - public void RedeliverRolledBackMessages() - { - dispatcher.RedeliverRolledBackMessages(); - } - - /// - /// Method Dispatch - /// - /// An ActiveMQMessage - public void Dispatch(ActiveMQMessage message) - { - if(AcknowledgementMode.AutoAcknowledge == this.acknowledgementMode) - { - MessageAck ack = CreateMessageAck(message); - Tracer.Debug("Sending AutoAck: " + ack); - message.Acknowledger += new AcknowledgeHandler(DoNothingAcknowledge); - - lock(closedLock) - { - if(closed) - { - throw new ConnectionClosedException(); - } - - ackSession.Connection.Oneway(ack); - } - } - - dispatcher.Enqueue(message); - } - - /// - /// Dispatch any pending messages to the asynchronous listener - /// - internal void DispatchAsyncMessages() - { - while(listener != null) - { - IMessage message = dispatcher.DequeueNoWait(); - if(message == null) - { - break; - } - - message = SetupAcknowledge(message); - // invoke listener. Exceptions caught by the dispatcher thread - listener(message); - } - } - - protected IMessage SetupAcknowledge(IMessage message) - { - if(null == message) - { - return null; - } - - if(message is ActiveMQMessage) - { - ActiveMQMessage activeMessage = (ActiveMQMessage) message; - - if(AcknowledgementMode.ClientAcknowledge == acknowledgementMode) - { - activeMessage.Acknowledger += new AcknowledgeHandler(DoClientAcknowledge); - } - else if(AcknowledgementMode.AutoAcknowledge != acknowledgementMode) - { - activeMessage.Acknowledger += new AcknowledgeHandler(DoNothingAcknowledge); - DoClientAcknowledge(activeMessage); - } - } - - return message; - } - protected void SendPullRequest(long timeout) { - if(this.info.PrefetchSize == 0 && this.dispatcher.isEmpty()) + if(this.info.PrefetchSize == 0 && this.unconsumedMessages.Empty) { MessagePull messagePull = new MessagePull(); messagePull.ConsumerId = this.info.ConsumerId; @@ -279,97 +310,702 @@ } } + protected void DoIndividualAcknowledge(ActiveMQMessage message) + { + // TODO + } + protected void DoNothingAcknowledge(ActiveMQMessage message) { } protected void DoClientAcknowledge(ActiveMQMessage message) { - MessageAck ack = CreateMessageAck(message); - Tracer.Debug("Sending Ack: " + ack); - lock(closedLock) - { - if(closed) - { - throw new ConnectionClosedException(); - } - - session.Connection.Oneway(ack); - } - } - - protected virtual MessageAck CreateMessageAck(Message message) - { - MessageAck ack = new MessageAck(); - ack.AckType = (int) AckType.ConsumedAck; - ack.ConsumerId = info.ConsumerId; - ack.Destination = message.Destination; - ack.FirstMessageId = message.MessageId; - ack.LastMessageId = message.MessageId; - ack.MessageCount = 1; - ack.ResponseRequired = false; - - if(session.Transacted) - { - session.DoStartTransaction(); - ack.TransactionId = session.TransactionContext.TransactionId; - session.TransactionContext.AddSynchronization( - new MessageConsumerSynchronization(this, message)); - } - return ack; - } - - public void AfterRollback(ActiveMQMessage message) - { - // lets redeliver the message again - message.RedeliveryCounter += 1; - if(message.RedeliveryCounter > MaximumRedeliveryCount) - { - // lets send back a poisoned pill - MessageAck ack = new MessageAck(); - ack.AckType = (int) AckType.PoisonAck; - ack.ConsumerId = info.ConsumerId; - ack.Destination = message.Destination; - ack.FirstMessageId = message.MessageId; - ack.LastMessageId = message.MessageId; - ack.MessageCount = 1; - session.Connection.Oneway(ack); - } - else - { - dispatcher.Redeliver(message); - } - } - } - - - // TODO maybe there's a cleaner way of creating stateful delegates to make this code neater - internal class MessageConsumerSynchronization : ISynchronization - { - private readonly MessageConsumer consumer; - private readonly Message message; - - public MessageConsumerSynchronization(MessageConsumer consumer, Message message) - { - this.message = message; - this.consumer = consumer; - } - - #region ISynchronization Members - - public void BeforeCommit() - { - } - - public void AfterCommit() - { - } + this.CheckClosed(); + Tracer.Debug("Sending Client Ack:"); + this.Acknowledge(); + } + + public void Start() + { + if(this.unconsumedMessages.Closed) + { + return; + } + + this.started.Value = true; + this.unconsumedMessages.Start(); + this.session.Executor.Wakeup(); + } + + public void Stop() + { + this.started.Value = false; + this.unconsumedMessages.Stop(); + } + + public void ClearMessagesInProgress() + { + // we are called from inside the transport reconnection logic + // which involves us clearing all the connections' consumers + // dispatch lists and clearing them + // so rather than trying to grab a mutex (which could be already + // owned by the message listener calling the send) we will just set + // a flag so that the list can be cleared as soon as the + // dispatch thread is ready to flush the dispatch list + this.clearDispatchList = true; + } + + public void DeliverAcks() + { + MessageAck ack = null; + + if(this.deliveringAcks.CompareAndSet(false, true)) + { + if(this.IsAutoAcknowledgeEach) + { + lock(this.dispatchedMessages) + { + ack = MakeAckForAllDeliveredMessages(AckType.DeliveredAck); + if(ack != null) + { + this.dispatchedMessages.Clear(); + } + else + { + ack = this.pendingAck; + this.pendingAck = null; + } + } + } + else if(pendingAck != null && pendingAck.AckType == (byte)AckType.ConsumedAck) + { + ack = pendingAck; + pendingAck = null; + } + + if(ack != null) + { + MessageAck ackToSend = ack; + + try + { + this.session.Connection.Oneway(ackToSend); + } + catch(Exception e) + { + Tracer.DebugFormat("{0} : Failed to send ack, {1}", this.info.ConsumerId, e); + } + } + else + { + this.deliveringAcks.Value = false; + } + } + } + + public void Dispatch(MessageDispatch dispatch) + { + MessageListener listener = this.listener; + + try + { + lock(this.unconsumedMessages.SyncRoot) + { + if(this.clearDispatchList) + { + // we are reconnecting so lets flush the in progress messages + this.clearDispatchList = false; + this.unconsumedMessages.Clear(); + + if(this.pendingAck != null && this.pendingAck.AckType == (byte)AckType.DeliveredAck) + { + // on resumption a pending delivered ack will be out of sync with + // re-deliveries. + Tracer.Debug("removing pending delivered ack on transport interupt: " + pendingAck); + this.pendingAck = null; + } + } + + if(!this.unconsumedMessages.Closed) + { + if(listener != null && this.unconsumedMessages.Running) + { + ActiveMQMessage message = CreateActiveMQMessage(dispatch); + + this.BeforeMessageIsConsumed(dispatch); + + try + { + bool expired = message.IsExpired(); + + if(!expired) + { + listener(message); + } + + this.AfterMessageIsConsumed(dispatch, expired); + } + catch(Exception e) + { + if(IsAutoAcknowledgeBatch || IsAutoAcknowledgeEach || this.session.IsIndividualAcknowledge) + { + // Redeliver the message + } + else + { + // Transacted or Client ack: Deliver the next message. + this.AfterMessageIsConsumed(dispatch, false); + } + + Tracer.Error(this.info.ConsumerId + " Exception while processing message: " + e); + } + } + else + { + this.unconsumedMessages.Enqueue(dispatch); + } + } + } + + if(++dispatchedCount % 1000 == 0) + { + dispatchedCount = 0; + Thread.Sleep(1); + } + } + catch(Exception e) + { + this.session.Connection.OnSessionException(this.session, e); + } + } + + public bool Iterate() + { + if(this.listener != null) + { + MessageDispatch dispatch = this.unconsumedMessages.DequeueNoWait(); + if(dispatch != null) + { + try + { + ActiveMQMessage message = CreateActiveMQMessage(dispatch); + BeforeMessageIsConsumed(dispatch); + listener(message); + AfterMessageIsConsumed(dispatch, false); + } + catch(NMSException e) + { + this.session.Connection.OnSessionException(this.session, e); + } + + return true; + } + } + + return false; + } + + /// + /// Used to get an enqueued message from the unconsumedMessages list. The + /// amount of time this method blocks is based on the timeout value. if + /// timeout == Timeout.Infinite then it blocks until a message is received. + /// if timeout == 0 then it it tries to not block at all, it returns a + /// message if it is available if timeout > 0 then it blocks up to timeout + /// amount of time. Expired messages will consumed by this method. + /// + /// + /// A + /// + /// + /// A + /// + private MessageDispatch Dequeue(TimeSpan timeout) + { + DateTime deadline = DateTime.Now; + + if(timeout > TimeSpan.Zero) + { + deadline = DateTime.Now + timeout; + } + + while(true) + { + MessageDispatch dispatch = this.unconsumedMessages.Dequeue(timeout); + + if(dispatch == null) + { + if(timeout > TimeSpan.Zero && !this.unconsumedMessages.Closed) + { + timeout = deadline < DateTime.Now ? TimeSpan.Zero : deadline - DateTime.Now; + } + else + { + return null; + } + } + else if(dispatch.Message == null) + { + return null; + } + else if(dispatch.Message.IsExpired()) + { + Tracer.DebugFormat("{0} received expired message: {1}", info.ConsumerId, dispatch); + + BeforeMessageIsConsumed(dispatch); + AfterMessageIsConsumed(dispatch, true); + + if(timeout > TimeSpan.Zero && !this.unconsumedMessages.Closed) + { + timeout = deadline < DateTime.Now ? TimeSpan.Zero : deadline - DateTime.Now; + } + } + else + { + Tracer.DebugFormat("{0} received message: {1}", info.ConsumerId, dispatch); + return dispatch; + } + } + } + + public void BeforeMessageIsConsumed(MessageDispatch dispatch) + { + this.lastDeliveredSequenceId = dispatch.Message.MessageId.BrokerSequenceId; + + if(!IsAutoAcknowledgeBatch) + { + lock(this.dispatchedMessages) + { + this.dispatchedMessages.AddFirst(dispatch); + } + + if(this.session.IsTransacted) + { + this.AckLater(dispatch, AckType.DeliveredAck); + } + } + } + + public void AfterMessageIsConsumed(MessageDispatch dispatch, bool expired) + { + if(this.unconsumedMessages.Closed) + { + return; + } + + if(expired == true) + { + lock(this.dispatchedMessages) + { + this.dispatchedMessages.Remove(dispatch); + } + + AckLater(dispatch, AckType.DeliveredAck); + } + else + { + if(this.session.IsTransacted) + { + // Do nothing. + } + else if(this.IsAutoAcknowledgeEach) + { + if(this.deliveringAcks.CompareAndSet(false, true)) + { + lock(this.dispatchedMessages) + { + if(this.dispatchedMessages.Count != 0) + { + MessageAck ack = MakeAckForAllDeliveredMessages(AckType.ConsumedAck); + if(ack !=null) + { + this.dispatchedMessages.Clear(); + this.session.Connection.Oneway(ack); + } + } + } + this.deliveringAcks.Value = false; + } + } + else if(this.IsAutoAcknowledgeBatch) + { + AckLater(dispatch, AckType.ConsumedAck); + } + else if(this.session.IsClientAcknowledge || this.session.IsIndividualAcknowledge) + { + AckLater(dispatch, AckType.DeliveredAck); + } + else + { + throw new NMSException("Invalid session state."); + } + } + } + + private MessageAck MakeAckForAllDeliveredMessages(AckType type) + { + lock(this.dispatchedMessages) + { + if(this.dispatchedMessages.Count == 0) + { + return null; + } + + MessageDispatch dispatch = this.dispatchedMessages.First.Value; + MessageAck ack = new MessageAck(); + + ack.AckType = (byte)type; + ack.ConsumerId = this.info.ConsumerId; + ack.Destination = dispatch.Destination; + ack.LastMessageId = dispatch.Message.MessageId; + ack.MessageCount = this.dispatchedMessages.Count; + ack.FirstMessageId = this.dispatchedMessages.Last.Value.Message.MessageId; + + return ack; + } + } + + private void AckLater(MessageDispatch dispatch, AckType type) + { + // Don't acknowledge now, but we may need to let the broker know the + // consumer got the message to expand the pre-fetch window + if(this.session.IsTransacted) + { + this.session.DoStartTransaction(); + + if(!synchronizationRegistered) + { + this.synchronizationRegistered = true; + this.session.TransactionContext.AddSynchronization(new MessageConsumerSynchronization(this)); + } + } + + this.deliveredCounter++; + + MessageAck oldPendingAck = pendingAck; + + pendingAck = new MessageAck(); + pendingAck.AckType = (byte)type; + pendingAck.ConsumerId = this.info.ConsumerId; + pendingAck.Destination = dispatch.Destination; + pendingAck.LastMessageId = dispatch.Message.MessageId; + pendingAck.MessageCount = deliveredCounter; + + if(this.session.IsTransacted && this.session.TransactionContext.InTransaction) + { + pendingAck.TransactionId = this.session.TransactionContext.TransactionId; + } + + if(oldPendingAck == null) + { + pendingAck.FirstMessageId = pendingAck.LastMessageId; + } + else if(oldPendingAck.AckType == pendingAck.AckType) + { + pendingAck.FirstMessageId = oldPendingAck.FirstMessageId; + } + else + { + Tracer.Debug("AckLater: Old Ack was not the same Ack type."); + + // old pending ack being superseded by ack of another type, if is is not a delivered + // ack and hence important, send it now so it is not lost. + if(oldPendingAck.AckType != (byte)AckType.DeliveredAck) + { + Tracer.Debug("Sending old pending ack " + oldPendingAck + ", new pending: " + pendingAck); + this.session.Connection.Oneway(oldPendingAck); + } + else + { + Tracer.Debug("dropping old pending ack " + oldPendingAck + ", new pending: " + pendingAck); + } + } + + if((0.5 * this.info.PrefetchSize) <= (this.deliveredCounter - this.additionalWindowSize)) + { + this.session.Connection.Oneway(pendingAck); + this.pendingAck = null; + this.deliveredCounter = 0; + this.additionalWindowSize = 0; + } + } + + private void Acknowledge() + { + lock(this.dispatchedMessages) + { + // Acknowledge all messages so far. + MessageAck ack = MakeAckForAllDeliveredMessages(AckType.ConsumedAck); + + if(ack == null) + { + return; // no msgs + } + + if(this.session.IsTransacted) + { + this.session.DoStartTransaction(); + ack.TransactionId = this.session.TransactionContext.TransactionId; + } + + this.session.Connection.Oneway(ack); + this.pendingAck = null; + + // Adjust the counters + this.deliveredCounter = Math.Max(0, this.deliveredCounter - this.dispatchedMessages.Count); + this.additionalWindowSize = Math.Max(0, this.additionalWindowSize - this.dispatchedMessages.Count); + + if(!this.session.IsTransacted) + { + this.dispatchedMessages.Clear(); + } + } + } + + private void Acknowledge(MessageDispatch dispatch) + { + MessageAck ack = new MessageAck(); + + ack.AckType = (byte)AckType.IndividualAck; + ack.ConsumerId = this.info.ConsumerId; + ack.Destination = dispatch.Destination; + ack.LastMessageId = dispatch.Message.MessageId; + ack.MessageCount = 1; + + this.session.Connection.Oneway(ack); + lock(this.dispatchedMessages) + { + this.dispatchedMessages.Remove(dispatch); + } + } + + private void Commit() + { + lock(this.dispatchedMessages) + { + this.dispatchedMessages.Clear(); + } + + this.redeliveryDelay = 0; + } + + private void Rollback() + { + lock(this.unconsumedMessages.SyncRoot) + { + lock(this.dispatchedMessages) + { + if(this.dispatchedMessages.Count == 0) + { + return; + } + + // Only increase the redelivery delay after the first redelivery.. + MessageDispatch lastMd = this.dispatchedMessages.First.Value; + int currentRedeliveryCount = lastMd.Message.RedeliveryCounter; + + if(currentRedeliveryCount > 0) + { + redeliveryDelay = 1000; + //redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay); + } + + MessageId firstMsgId = this.dispatchedMessages.Last.Value.Message.MessageId; + + //if(redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES + // && lastMd.getMessage().getRedeliveryCounter() > redeliveryPolicy.getMaximumRedeliveries()) { + if(lastMd.Message.RedeliveryCounter > MessageConsumer.DEFAULT_MAX_REDELIVERIES) + { + // We need to NACK the messages so that they get sent to the + // DLQ. + // Acknowledge the last message. + + MessageAck ack = new MessageAck(); + + ack.AckType = (byte)AckType.PoisonAck; + ack.ConsumerId = this.info.ConsumerId; + ack.Destination = lastMd.Destination; + ack.LastMessageId = lastMd.Message.MessageId; + ack.MessageCount = this.dispatchedMessages.Count; + ack.FirstMessageId = firstMsgId; + + this.session.Connection.Oneway(ack); + + // Adjust the window size. + additionalWindowSize = Math.Max(0, this.additionalWindowSize - this.dispatchedMessages.Count); + + //redeliveryDelay = 0; + } + else + { + // only redelivery_ack after first delivery + if(currentRedeliveryCount > 0) + { + MessageAck ack = new MessageAck(); + + ack.AckType = (byte)AckType.RedeliveredAck; + ack.ConsumerId = this.info.ConsumerId; + ack.Destination = lastMd.Destination; + ack.LastMessageId = lastMd.Message.MessageId; + ack.MessageCount = this.dispatchedMessages.Count; + ack.FirstMessageId = firstMsgId; + + this.session.Connection.Oneway(ack); + } + + // stop the delivery of messages. + this.unconsumedMessages.Stop(); + + foreach(MessageDispatch dispatch in this.dispatchedMessages) + { + this.unconsumedMessages.EnqueueFirst(dispatch); + } + + if(redeliveryDelay > 0 && !this.unconsumedMessages.Closed) + { + DateTime deadline = DateTime.Now.AddMilliseconds(redeliveryDelay); + ThreadPool.QueueUserWorkItem(this.RollbackHelper, deadline); + } else { + Start(); + } + } + + this.deliveredCounter -= this.dispatchedMessages.Count; + this.dispatchedMessages.Clear(); + } + } + + // Only redispatch if there's an async listener otherwise a synchronous + // consumer will pull them from the local queue. + if(this.listener != null) + { + this.session.Redispatch(this.unconsumedMessages); + } + } + + private void RollbackHelper(Object arg) + { + try + { + TimeSpan waitTime = (DateTime) arg - DateTime.Now; + + if(waitTime.CompareTo(TimeSpan.Zero) > 0) + { + Thread.Sleep(waitTime); + } + + this.Start(); + } + catch(Exception e) + { + this.session.Connection.OnSessionException(this.session, e); + } + } + + private ActiveMQMessage CreateActiveMQMessage(MessageDispatch dispatch) + { + ActiveMQMessage message = dispatch.Message.Clone() as ActiveMQMessage; + + if(this.session.IsClientAcknowledge) + { + message.Acknowledger += new AcknowledgeHandler(DoClientAcknowledge); + } + else if(this.session.IsIndividualAcknowledge) + { + message.Acknowledger += new AcknowledgeHandler(DoIndividualAcknowledge); + } + else + { + message.Acknowledger += new AcknowledgeHandler(DoNothingAcknowledge); + } + + return message; + } + + private void CheckClosed() + { + if(this.unconsumedMessages.Closed) + { + throw new NMSException("The Consumer has been Closed"); + } + } + + private void CheckMessageListener() + { + if(this.listener != null) + { + throw new NMSException("Cannot set Async listeners on Consumers with a prefetch limit of zero"); + } + } + + private bool IsAutoAcknowledgeEach + { + get + { + return this.session.IsAutoAcknowledge || + (this.session.IsDupsOkAcknowledge && this.info.Destination.IsQueue); + } + } + + private bool IsAutoAcknowledgeBatch + { + get { return this.session.IsDupsOkAcknowledge && !this.info.Destination.IsQueue; } + } + + #region Nested ISyncronization Types + + class MessageConsumerSynchronization : ISynchronization + { + private readonly MessageConsumer consumer; + + public MessageConsumerSynchronization(MessageConsumer consumer) + { + this.consumer = consumer; + } + + public void BeforeEnd() + { + this.consumer.Acknowledge(); + this.consumer.synchronizationRegistered = false; + } + + public void AfterCommit() + { + this.consumer.Commit(); + this.consumer.synchronizationRegistered = false; + } + + public void AfterRollback() + { + this.consumer.Rollback(); + this.consumer.synchronizationRegistered = false; + } + } + + class ConsumerCloseSynchronization : ISynchronization + { + private readonly MessageConsumer consumer; + + public ConsumerCloseSynchronization(MessageConsumer consumer) + { + this.consumer = consumer; + } + + public void BeforeEnd() + { + } + + public void AfterCommit() + { + this.consumer.DoClose(); + } + + public void AfterRollback() + { + this.consumer.DoClose(); + } + } - public void AfterRollback() - { - consumer.AfterRollback((ActiveMQMessage) message); - } - - #endregion - } + #endregion + } } Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs?rev=829386&r1=829385&r2=829386&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs Sat Oct 24 15:23:52 2009 @@ -101,6 +101,23 @@ return; } + DoClose(); + RemoveInfo removeInfo = new RemoveInfo(); + removeInfo.ObjectId = this.info.ProducerId; + this.session.Connection.Oneway(removeInfo); + this.session = null; + } + } + + internal void DoClose() + { + lock(closedLock) + { + if(closed) + { + return; + } + try { session.DisposeOf(info.ProducerId); @@ -115,9 +132,8 @@ this.usage.Stop(); } - session = null; closed = true; - } + } } public void Send(IMessage message) Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs?rev=829386&r1=829385&r2=829386&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs Sat Oct 24 15:23:52 2009 @@ -27,36 +27,52 @@ /// /// Default provider of ISession /// - public class Session : ISession + public class Session : ISession, IDispatcher { /// /// Private object used for synchronization, instead of public "this" /// private readonly object myLock = new object(); - private int consumerCounter; + private readonly IDictionary consumers = Hashtable.Synchronized(new Hashtable()); private readonly IDictionary producers = Hashtable.Synchronized(new Hashtable()); - private readonly DispatchingThread dispatchingThread; - private DispatchingThread.ExceptionHandler dispatchingThread_ExceptionHandler; + + private SessionExecutor executor; + private TransactionContext transactionContext; + private Connection connection; + + private int prefetchSize; + private int maximumPendingMessageLimit; + private bool dispatchAsync; + private bool exclusive; + private bool retroactive; + private byte priority; + private readonly SessionInfo info; + private int consumerCounter; private int producerCounter; - internal bool startedAsyncDelivery = false; + private long nextDeliveryId; + private long lastDeliveredSequenceId; private bool disposed = false; private bool closed = false; private bool closing = false; - private TimeSpan MAX_THREAD_WAIT = TimeSpan.FromMilliseconds(30000); + private TimeSpan requestTimeout = Apache.NMS.NMSConstants.defaultRequestTimeout; + private AcknowledgementMode acknowledgementMode; public Session(Connection connection, SessionInfo info, AcknowledgementMode acknowledgementMode) { this.connection = connection; this.info = info; this.acknowledgementMode = acknowledgementMode; - this.AsyncSend = connection.AsyncSend; this.requestTimeout = connection.RequestTimeout; this.PrefetchSize = 1000; - this.transactionContext = new TransactionContext(this); - this.dispatchingThread = new DispatchingThread(new DispatchingThread.DispatchFunction(DispatchAsyncMessages)); - this.dispatchingThread_ExceptionHandler = new DispatchingThread.ExceptionHandler(dispatchingThread_ExceptionListener); + + if(acknowledgementMode == AcknowledgementMode.Transactional) + { + this.transactionContext = new TransactionContext(this); + } + + this.executor = new SessionExecutor(this, this.consumers); } ~Session() @@ -64,11 +80,17 @@ Dispose(false); } + #region Property Accessors + /// /// Sets the prefetch size, the maximum number of messages a broker will dispatch to consumers /// until acknowledgements are received. /// - public int PrefetchSize; + public int PrefetchSize + { + get{ return this.prefetchSize; } + set{ this.prefetchSize = value; } + } /// /// Sets the maximum number of messages to keep around per consumer @@ -76,35 +98,49 @@ /// will start to be evicted for slow consumers. /// Must be > 0 to enable this feature /// - public int MaximumPendingMessageLimit; + public int MaximumPendingMessageLimit + { + get{ return this.maximumPendingMessageLimit; } + set{ this.maximumPendingMessageLimit = value; } + } /// /// Enables or disables whether asynchronous dispatch should be used by the broker /// - public bool DispatchAsync; + public bool DispatchAsync + { + get{ return this.dispatchAsync; } + set{ this.dispatchAsync = value; } + } /// /// Enables or disables exclusive consumers when using queues. An exclusive consumer means /// only one instance of a consumer is allowed to process messages on a queue to preserve order /// - public bool Exclusive; + public bool Exclusive + { + get{ return this.exclusive; } + set{ this.exclusive = value; } + } /// /// Enables or disables retroactive mode for consumers; i.e. do they go back in time or not? /// - public bool Retroactive; + public bool Retroactive + { + get{ return this.retroactive; } + set{ this.retroactive = value; } + } /// /// Sets the default consumer priority for consumers /// - public byte Priority; - - /// - /// This property indicates whether or not async send is enabled. - /// - public bool AsyncSend; + public byte Priority + { + get{ return this.priority; } + set{ this.priority = value; } + } - private Connection connection; public Connection Connection { get { return this.connection; } @@ -115,12 +151,64 @@ get { return info.SessionId; } } - private TransactionContext transactionContext; public TransactionContext TransactionContext { get { return this.transactionContext; } } + public TimeSpan RequestTimeout + { + get { return this.requestTimeout; } + set { this.requestTimeout = value; } + } + + public bool Transacted + { + get { return this.AcknowledgementMode == AcknowledgementMode.Transactional; } + } + + public AcknowledgementMode AcknowledgementMode + { + get { return this.acknowledgementMode; } + } + + public bool IsClientAcknowledge + { + get { return this.acknowledgementMode == AcknowledgementMode.ClientAcknowledge; } + } + + public bool IsAutoAcknowledge + { + get { return this.acknowledgementMode == AcknowledgementMode.AutoAcknowledge; } + } + + public bool IsDupsOkAcknowledge + { + get { return this.acknowledgementMode == AcknowledgementMode.DupsOkAcknowledge; } + } + + public bool IsIndividualAcknowledge + { + get { return false; } + } + + public bool IsTransacted + { + get { return this.acknowledgementMode == AcknowledgementMode.Transactional; } + } + + public SessionExecutor Executor + { + get { return this.executor; } + } + + public long NextDeliveryId + { + get { return Interlocked.Increment(ref this.nextDeliveryId); } + } + + #endregion + #region ISession Members public void Dispose() @@ -164,13 +252,49 @@ try { + DoClose(); + } + catch(Exception ex) + { + Tracer.ErrorFormat("Error during session close: {0}", ex); + } + finally + { + // Make sure we attempt to inform the broker this Session is done. + RemoveInfo info = new RemoveInfo(); + info.ObjectId = this.info.SessionId; + info.LastDeliveredSequenceId = this.lastDeliveredSequenceId; + this.connection.Oneway(info); + this.connection = null; + this.closed = true; + this.closing = false; + } + } + } + + internal void DoClose() + { + lock(myLock) + { + if(this.closed) + { + return; + } + + try + { this.closing = true; - StopAsyncDelivery(); + + // Stop all message deliveries from this Session + Stop(); + lock(consumers.SyncRoot) { foreach(MessageConsumer consumer in consumers.Values) { - consumer.Close(); + consumer.DoClose(); + this.lastDeliveredSequenceId = + Math.Min(this.lastDeliveredSequenceId, consumer.LastDeliveredSequenceId); } } consumers.Clear(); @@ -179,10 +303,23 @@ { foreach(MessageProducer producer in producers.Values) { - producer.Close(); + producer.DoClose(); } } producers.Clear(); + + // If in a transaction roll it back + if(this.IsTransacted && this.transactionContext.InTransaction) + { + try + { + this.transactionContext.Rollback(); + } + catch + { + } + } + Connection.RemoveSession(this); } catch(Exception ex) @@ -191,13 +328,12 @@ } finally { - this.connection = null; this.closed = true; this.closing = false; } - } + } } - + public IMessageProducer CreateProducer() { return CreateProducer(null); @@ -213,7 +349,7 @@ { producer = new MessageProducer(this, command); producers[producerId] = producer; - this.DoSend(command); + this.connection.Oneway(command); } catch(Exception) { @@ -253,12 +389,21 @@ ConsumerId consumerId = command.ConsumerId; MessageConsumer consumer = null; + // Registered with Connection before we register at the broker. + connection.addDispatcher(consumerId, this); + try { - consumer = new MessageConsumer(this, command, this.AcknowledgementMode); + consumer = new MessageConsumer(this, command); // lets register the consumer first in case we start dispatching messages immediately consumers[consumerId] = consumer; - this.DoSend(command); + this.Connection.SyncRequest(command); + + if(this.Started) + { + consumer.Start(); + } + return consumer; } catch(Exception) @@ -285,12 +430,21 @@ command.NoLocal = noLocal; MessageConsumer consumer = null; + // Registered with Connection before we register at the broker. + connection.addDispatcher(consumerId, this); + try { - consumer = new MessageConsumer(this, command, this.AcknowledgementMode); + consumer = new MessageConsumer(this, command); // lets register the consumer first in case we start dispatching messages immediately consumers[consumerId] = consumer; - this.DoSend(command); + + if(this.Started) + { + consumer.Start(); + } + + this.connection.SyncRequest(command); } catch(Exception) { @@ -311,7 +465,7 @@ command.ConnectionId = Connection.ConnectionId; command.ClientId = Connection.ClientId; command.SubcriptionName = name; - this.DoSend(command); + this.connection.SyncRequest(command); } public IQueueBrowser CreateBrowser(IQueue queue) @@ -358,27 +512,24 @@ command.OperationType = DestinationInfo.REMOVE_OPERATION_TYPE; // 1 is remove command.Destination = (ActiveMQDestination) destination; - this.DoSend(command); + this.connection.Oneway(command); } public IMessage CreateMessage() { ActiveMQMessage answer = new ActiveMQMessage(); - Configure(answer); return answer; } public ITextMessage CreateTextMessage() { ActiveMQTextMessage answer = new ActiveMQTextMessage(); - Configure(answer); return answer; } public ITextMessage CreateTextMessage(string text) { ActiveMQTextMessage answer = new ActiveMQTextMessage(text); - Configure(answer); return answer; } @@ -419,6 +570,7 @@ "You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: " + this.AcknowledgementMode); } + this.TransactionContext.Commit(); } @@ -430,55 +582,12 @@ "You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: " + this.AcknowledgementMode); } + this.TransactionContext.Rollback(); - - // lets ensure all the consumers redeliver any rolled back messages - lock(consumers.SyncRoot) - { - foreach(MessageConsumer consumer in consumers.Values) - { - consumer.RedeliverRolledBackMessages(); - } - } - } - - - // Properties - - private TimeSpan requestTimeout = Apache.NMS.NMSConstants.defaultRequestTimeout; - public TimeSpan RequestTimeout - { - get { return this.requestTimeout; } - set { this.requestTimeout = value; } - } - - public bool Transacted - { - get { return this.AcknowledgementMode == AcknowledgementMode.Transactional; } - } - - private AcknowledgementMode acknowledgementMode; - public AcknowledgementMode AcknowledgementMode - { - get { return this.acknowledgementMode; } } #endregion - private void dispatchingThread_ExceptionListener(Exception exception) - { - if(null != Connection) - { - try - { - Connection.OnSessionException(this, exception); - } - catch - { - } - } - } - protected void CreateTemporaryDestination(ActiveMQDestination tempDestination) { DestinationInfo command = new DestinationInfo(); @@ -486,25 +595,7 @@ command.OperationType = DestinationInfo.ADD_OPERATION_TYPE; // 0 is add command.Destination = tempDestination; - this.DoSend(command); - } - - private void DoSend(Command message) - { - this.DoSend(message, this.RequestTimeout); - } - - private void DoSend(Command message, TimeSpan requestTimeout) - { - if(AsyncSend) - { - message.ResponseRequired = false; - Connection.Oneway(message); - } - else - { - Connection.SyncRequest(message, requestTimeout); - } + this.connection.SyncRequest(command); } public void DoSend( ActiveMQMessage message, MessageProducer producer, MemoryUsage producerWindow, TimeSpan sendTimeout ) @@ -566,9 +657,11 @@ } } - public void DisposeOf(ConsumerId objectId) + public void DisposeOf(ConsumerId objectId, long lastDeliveredSequenceId) { - Connection.DisposeOf(objectId); + connection.removeDispatcher(objectId); + this.lastDeliveredSequenceId = Math.Min(this.lastDeliveredSequenceId, lastDeliveredSequenceId); + if(!this.closing) { consumers.Remove(objectId); @@ -577,7 +670,6 @@ public void DisposeOf(ProducerId objectId) { - Connection.DisposeOf(objectId); connection.removeProducer(objectId); if(!this.closing) { @@ -585,37 +677,6 @@ } } - public bool DispatchMessage(ConsumerId consumerId, Message message) - { - bool dispatched = false; - MessageConsumer consumer = (MessageConsumer) consumers[consumerId]; - - if(consumer != null) - { - consumer.Dispatch((ActiveMQMessage) message); - dispatched = true; - } - - return dispatched; - } - - /// - /// Private method called by the dispatcher thread in order to perform - /// asynchronous delivery of queued (inbound) messages. - /// - private void DispatchAsyncMessages() - { - // lets iterate through each consumer created by this session - // ensuring that they have all pending messages dispatched - lock(consumers.SyncRoot) - { - foreach(MessageConsumer consumer in consumers.Values) - { - consumer.DispatchAsyncMessages(); - } - } - } - protected virtual ConsumerInfo CreateConsumerInfo(IDestination destination, string selector) { ConsumerInfo answer = new ConsumerInfo(); @@ -666,36 +727,79 @@ return answer; } - /// - /// Configures the message command - /// - protected void Configure(ActiveMQMessage message) + public void Stop() { + if(this.executor != null) + { + this.executor.Stop(); + } } - internal void StopAsyncDelivery() + public void Start() { - if(startedAsyncDelivery) + foreach(MessageConsumer consumer in this.consumers.Values) { - this.dispatchingThread.ExceptionListener -= this.dispatchingThread_ExceptionHandler; - dispatchingThread.Stop((int) MAX_THREAD_WAIT.TotalMilliseconds); - startedAsyncDelivery = false; + consumer.Start(); } + + if(this.executor != null) + { + this.executor.Start(); + } } - internal void StartAsyncDelivery() + public bool Started { - if(!startedAsyncDelivery) + get { - this.dispatchingThread.ExceptionListener += this.dispatchingThread_ExceptionHandler; - dispatchingThread.Start(); - startedAsyncDelivery = true; + return this.executor != null ? this.executor.Running : false; } } - internal void RegisterConsumerDispatcher(Dispatcher dispatcher) + public void Redispatch(MessageDispatchChannel channel) { - dispatcher.SetAsyncDelivery(this.dispatchingThread.EventHandle); + MessageDispatch[] messages = channel.RemoveAll(); + System.Array.Reverse(messages); + + foreach(MessageDispatch message in messages) + { + this.executor.ExecuteFirst(message); + } } + + public void Dispatch(MessageDispatch dispatch) + { + if(this.executor != null) + { + this.executor.Execute(dispatch); + } + } + + public void ClearMessagesInProgress() + { + if( this.executor != null ) { + this.executor.ClearMessagesInProgress(); + } + + lock(this.consumers.SyncRoot) + { + foreach(MessageConsumer consumer in this.consumers) + { + consumer.ClearMessagesInProgress(); + } + } + } + + public void DeliverAcks() + { + lock(this.consumers.SyncRoot) + { + foreach(MessageConsumer consumer in this.consumers) + { + consumer.DeliverAcks(); + } + } + } + } } Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/SessionExecutor.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/SessionExecutor.cs?rev=829386&view=auto ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/SessionExecutor.cs (added) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/SessionExecutor.cs Sat Oct 24 15:23:52 2009 @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections; +using Apache.NMS.ActiveMQ.Commands; +using Apache.NMS.ActiveMQ.Util; +using Apache.NMS.ActiveMQ.Threads; + +namespace Apache.NMS.ActiveMQ +{ + public class SessionExecutor : Threads.Task + { + private MessageDispatchChannel messageQueue = new MessageDispatchChannel(); + private TaskRunner taskRunner = null; + + private Session session = null; + private IDictionary consumers = null; + + public SessionExecutor(Session session, IDictionary consumers) + { + this.session = session; + this.consumers = consumers; + } + + ~SessionExecutor() + { + try + { + Stop(); + Close(); + Clear(); + } + catch + { + } + } + + public void Execute(MessageDispatch dispatch) + { + // Add the data to the queue. + this.messageQueue.Enqueue(dispatch); + this.Wakeup(); + } + + public void ExecuteFirst(MessageDispatch dispatch) + { + // Add the data to the queue. + this.messageQueue.EnqueueFirst(dispatch); + this.Wakeup(); + } + + public void Wakeup() + { + TaskRunner taskRunner = this.taskRunner; + + lock(messageQueue.SyncRoot) + { + if(this.taskRunner == null) + { + this.taskRunner = new DedicatedTaskRunner(this); + } + + taskRunner = this.taskRunner; + } + + taskRunner.Wakeup(); + } + + public void Start() + { + if(!messageQueue.Running) + { + messageQueue.Start(); + + if(HasUncomsumedMessages) + { + this.Wakeup(); + } + } + } + + public void Stop() + { + if(messageQueue.Running) + { + messageQueue.Stop(); + TaskRunner taskRunner = this.taskRunner; + + if(taskRunner != null) + { + this.taskRunner = null; + taskRunner.Shutdown(); + } + } + } + + public void Close() + { + this.messageQueue.Close(); + } + + public void Dispatch(MessageDispatch dispatch) + { + try + { + MessageConsumer consumer = null; + + lock(this.consumers.SyncRoot) + { + if(this.consumers.Contains(dispatch.ConsumerId)) + { + consumer = this.consumers[dispatch.ConsumerId] as MessageConsumer; + } + + // If the consumer is not available, just ignore the message. + // Otherwise, dispatch the message to the consumer. + if(consumer != null) + { + consumer.Dispatch(dispatch); + } + } + + } + catch(Exception ex) + { + Tracer.DebugFormat("Caught Exception While Dispatching: {0}", ex.Message ); + } + } + + public bool Iterate() + { + try + { + lock(this.consumers.SyncRoot) + { + // Deliver any messages queued on the consumer to their listeners. + foreach( MessageConsumer consumer in this.consumers.Values ) + { + if(consumer.Iterate()) + { + return true; + } + } + } + + // No messages left queued on the listeners.. so now dispatch messages + // queued on the session + MessageDispatch message = messageQueue.DequeueNoWait(); + + if(message != null) + { + this.Dispatch(message); + return !messageQueue.Empty; + } + + return false; + } + catch(Exception ex) + { + Tracer.DebugFormat("Caught Exception While Dispatching: {0}", ex.Message ); + this.session.Connection.OnSessionException(this.session, ex); + } + + return true; + } + + public void ClearMessagesInProgress() + { + this.messageQueue.Clear(); + } + + public void Clear() + { + this.messageQueue.Clear(); + } + + public MessageDispatch[] UnconsumedMessages + { + get{ return messageQueue.RemoveAll(); } + } + + public bool HasUncomsumedMessages + { + get{ return !messageQueue.Closed && messageQueue.Running && !messageQueue.Empty; } + } + + public bool Running + { + get{ return this.messageQueue.Running; } + } + + public bool Empty + { + get{ return this.messageQueue.Empty; } + } + + } +} Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/SessionExecutor.cs ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/DedicatedTaskRunner.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/DedicatedTaskRunner.cs?rev=829386&view=auto ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/DedicatedTaskRunner.cs (added) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/DedicatedTaskRunner.cs Sat Oct 24 15:23:52 2009 @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Threading; + +namespace Apache.NMS.ActiveMQ.Threads +{ + /// + /// A TaskRunner that dedicates a single thread to running a single Task. + /// + public class DedicatedTaskRunner : TaskRunner + { + private readonly Mutex mutex = new Mutex(); + private Thread theThread = null; + private Task task = null; + + private bool terminated = false; + private bool pending = false; + private bool shutdown = false; + + public DedicatedTaskRunner(Task task) + { + if(task == null) + { + throw new NullReferenceException("Task was null"); + } + + this.task = task; + + this.theThread = new Thread(Run); + this.theThread.Start(); + } + + ~DedicatedTaskRunner() + { + this.Shutdown(); + } + + public void Shutdown(TimeSpan timeout) + { + lock(mutex) + { + this.shutdown = true; + this.pending = true; + + Monitor.PulseAll(this.mutex); + + // Wait till the thread stops ( no need to wait if shutdown + // is called from thread that is shutting down) + if(!this.terminated) + { + Monitor.Wait(this.mutex, timeout); + } + } + } + + public void Shutdown() + { + this.Shutdown(new TimeSpan(Timeout.Infinite)); + } + + public void Wakeup() + { + lock(mutex) + { + if(this.shutdown) + { + return; + } + + this.pending = true; + + Monitor.PulseAll(this.mutex); + } + } + + internal void Run() + { + try + { + while(true) + { + lock(this.mutex) + { + pending = false; + + if(this.shutdown) + { + return; + } + } + + if(!this.task.Iterate()) + { + // wait to be notified. + lock(this.mutex) + { + if(this.shutdown) + { + return; + } + + while(!this.pending) + { + Monitor.Wait(this.mutex); + } + } + } + } + } + catch + { + } + + // Make sure we notify any waiting threads that thread + // has terminated. + lock(this.mutex) + { + this.terminated = true; + Monitor.PulseAll(this.mutex); + } + } + } +}