Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 95636 invoked from network); 17 Dec 2009 20:34:34 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 17 Dec 2009 20:34:34 -0000 Received: (qmail 24825 invoked by uid 500); 17 Dec 2009 20:34:34 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 24770 invoked by uid 500); 17 Dec 2009 20:34:34 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 24761 invoked by uid 99); 17 Dec 2009 20:34:34 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Dec 2009 20:34:33 +0000 X-ASF-Spam-Status: No, hits=-2.6 required=5.0 tests=AWL,BAYES_00 X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Dec 2009 20:34:30 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id B44F92388AA9; Thu, 17 Dec 2009 20:34:10 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r891876 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp: Connection.cs MessageConsumer.cs MessageProducer.cs Session.cs TransactionContext.cs Date: Thu, 17 Dec 2009 20:34:10 -0000 To: commits@activemq.apache.org From: tabish@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20091217203410.B44F92388AA9@eris.apache.org> Author: tabish Date: Thu Dec 17 20:34:10 2009 New Revision: 891876 URL: http://svn.apache.org/viewvc?rev=891876&view=rev Log: http://issues.apache.org/activemq/browse/AMQNET-220 Attempting to fix redelivery issues. Found some things that the client was not doing correctly when sending various messages. This submission adds the option the set a session to send acks async or sync, the session will also look at the type of session ack mode and for Transacted sessions always send the acks sync. Added the DispatchAsync option to Connection to allow the connection to specify if consumers should configure themselves with the broker as asyncDispatch or not, default is true, Also cleaned up some logging that duplicates what can be achieved by using the transport.UseLogging=true option. Still have issue with message getting tagged on the broker as redelived when the actually aren't Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/Connection.cs activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/MessageConsumer.cs activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/MessageProducer.cs activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/Session.cs activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/TransactionContext.cs Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/Connection.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/Connection.cs?rev=891876&r1=891875&r2=891876&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/Connection.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/Connection.cs Thu Dec 17 20:34:10 2009 @@ -46,6 +46,8 @@ private bool asyncClose = true; private bool useCompression = false; private bool copyMessageOnSend = true; + private bool sendAcksAsync = false; + private bool dispatchAsync = true; private int producerWindowSize = 0; private bool connected = false; private bool closed = false; @@ -136,6 +138,17 @@ } /// + /// This property indicates whether or not async sends are used for + /// message acknowledgement messages. Sending Acks async can improve + /// performance but may decrease reliability. + /// + public bool SendAcksAsync + { + get { return sendAcksAsync; } + set { sendAcksAsync = value; } + } + + /// /// This property sets the acknowledgment mode for the connection. /// The URI parameter connection.ackmode can be set to a string value /// that maps to the enumeration value. @@ -225,6 +238,15 @@ set { this.acknowledgementMode = value; } } + /// + /// synchronously or asynchronously by the broker. + /// + public bool DispatchAsync + { + get { return this.dispatchAsync; } + set { this.dispatchAsync = value; } + } + public string ClientId { get { return info.ClientId; } @@ -338,7 +360,7 @@ { SessionInfo info = CreateSessionInfo(sessionAcknowledgementMode); SyncRequest(info, this.RequestTimeout); - Session session = new Session(this, info, sessionAcknowledgementMode); + Session session = new Session(this, info, sessionAcknowledgementMode, this.dispatchAsync); // Set properties on session using parameters prefixed with "session." URISupport.CompositeData c = URISupport.parseComposite(this.brokerUri); Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/MessageConsumer.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/MessageConsumer.cs?rev=891876&r1=891875&r2=891876&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/MessageConsumer.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/MessageConsumer.cs Thu Dec 17 20:34:10 2009 @@ -279,7 +279,7 @@ if(!this.session.IsTransacted) { - lock(this.dispatchedMessages) + lock(this.dispatchedMessages) { dispatchedMessages.Clear(); } @@ -288,7 +288,7 @@ this.unconsumedMessages.Close(); this.session.DisposeOf(this.info.ConsumerId, this.lastDeliveredSequenceId); - RemoveInfo removeCommand = new RemoveInfo(); + RemoveInfo removeCommand = new RemoveInfo(); removeCommand.ObjectId = this.info.ConsumerId; removeCommand.LastDeliveredSequenceId = this.lastDeliveredSequenceId; @@ -352,7 +352,7 @@ ack.MessageCount = 1; Tracer.Debug("Sending Individual Ack for MessageId: " + ack.LastMessageId.ToString()); - this.session.Connection.Oneway(ack); + this.session.SendAck(ack); } protected void DoNothingAcknowledge(ActiveMQMessage message) @@ -430,7 +430,7 @@ try { - this.session.Connection.Oneway(ackToSend); + this.session.SendAck(ackToSend); } catch(Exception e) { @@ -685,7 +685,7 @@ if(ack != null) { this.dispatchedMessages.Clear(); - this.session.Connection.Oneway(ack); + this.session.SendAck(ack); } } } @@ -828,7 +828,7 @@ ack.TransactionId = this.session.TransactionContext.TransactionId; } - this.session.Connection.Oneway(ack); + this.session.SendAck(ack); this.pendingAck = null; // Adjust the counters @@ -890,7 +890,7 @@ ack.MessageCount = this.dispatchedMessages.Count; ack.FirstMessageId = firstMsgId; - this.session.Connection.Oneway(ack); + this.session.SendAck(ack); // Adjust the window size. additionalWindowSize = Math.Max(0, this.additionalWindowSize - this.dispatchedMessages.Count); @@ -911,7 +911,7 @@ ack.MessageCount = this.dispatchedMessages.Count; ack.FirstMessageId = firstMsgId; - this.session.Connection.Oneway(ack); + this.session.SendAck(ack); } // stop the delivery of messages. Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/MessageProducer.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/MessageProducer.cs?rev=891876&r1=891875&r2=891876&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/MessageProducer.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/MessageProducer.cs Thu Dec 17 20:34:10 2009 @@ -38,7 +38,7 @@ private MsgDeliveryMode msgDeliveryMode = NMSConstants.defaultDeliveryMode; private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout; private TimeSpan msgTimeToLive = NMSConstants.defaultTimeToLive; - private MsgPriority msgPriority = NMSConstants.defaultPriority; + private MsgPriority msgPriority = NMSConstants.defaultPriority - 1; private bool disableMessageID = false; private bool disableMessageTimestamp = false; protected bool disposed = false; @@ -133,7 +133,7 @@ } closed = true; - } + } } public void Send(IMessage message) @@ -302,12 +302,12 @@ return session.CreateBytesMessage(body); } - public IStreamMessage CreateStreamMessage() - { - return session.CreateStreamMessage(); - } + public IStreamMessage CreateStreamMessage() + { + return session.CreateStreamMessage(); + } - public void OnProducerAck(ProducerAck ack) + public void OnProducerAck(ProducerAck ack) { Tracer.Debug("Received ProducerAck for Message of Size = {" + ack.Size + "}" ); Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/Session.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/Session.cs?rev=891876&r1=891875&r2=891876&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/Session.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/Session.cs Thu Dec 17 20:34:10 2009 @@ -33,10 +33,10 @@ /// Private object used for synchronization, instead of public "this" /// private readonly object myLock = new object(); - + private readonly IDictionary consumers = Hashtable.Synchronized(new Hashtable()); private readonly IDictionary producers = Hashtable.Synchronized(new Hashtable()); - + private SessionExecutor executor; private TransactionContext transactionContext; private Connection connection; @@ -44,8 +44,8 @@ private bool dispatchAsync; private bool exclusive; private bool retroactive; - private byte priority; - + private byte priority = 4; + private readonly SessionInfo info; private int consumerCounter; private int producerCounter; @@ -57,12 +57,13 @@ private TimeSpan requestTimeout = Apache.NMS.NMSConstants.defaultRequestTimeout; private AcknowledgementMode acknowledgementMode; - public Session(Connection connection, SessionInfo info, AcknowledgementMode acknowledgementMode) + public Session(Connection connection, SessionInfo info, AcknowledgementMode acknowledgementMode, bool dispatchAsync) { this.connection = connection; this.info = info; this.acknowledgementMode = acknowledgementMode; this.requestTimeout = connection.RequestTimeout; + this.dispatchAsync = dispatchAsync; if(acknowledgementMode == AcknowledgementMode.Transactional) { @@ -201,9 +202,9 @@ { get { return Interlocked.Increment(ref this.nextDeliveryId); } } - + #endregion - + #region ISession Members public void Dispose() @@ -284,7 +285,7 @@ // Stop all message deliveries from this Session Stop(); - + lock(consumers.SyncRoot) { foreach(MessageConsumer consumer in consumers.Values) @@ -315,8 +316,8 @@ catch { } - } - + } + Connection.RemoveSession(this); } catch(Exception ex) @@ -328,9 +329,9 @@ this.closed = true; this.closing = false; } - } + } } - + public IMessageProducer CreateProducer() { return CreateProducer(null); @@ -400,7 +401,7 @@ { consumer.Start(); } - + return consumer; } catch(Exception) @@ -430,7 +431,7 @@ // Registered with Connection before we register at the broker. connection.addDispatcher(consumerId, this); - + try { consumer = new MessageConsumer(this, command); @@ -441,7 +442,7 @@ { consumer.Start(); } - + this.connection.SyncRequest(command); } catch(Exception) @@ -475,7 +476,7 @@ { throw new NotSupportedException("Not Yet Implemented"); } - + public IQueue GetQueue(string name) { return new ActiveMQQueue(name); @@ -548,12 +549,12 @@ return ConfigureMessage(answer) as IBytesMessage; } - public IStreamMessage CreateStreamMessage() - { - return ConfigureMessage(new ActiveMQStreamMessage()) as IStreamMessage; - } - - public IObjectMessage CreateObjectMessage(object body) + public IStreamMessage CreateStreamMessage() + { + return ConfigureMessage(new ActiveMQStreamMessage()) as IStreamMessage; + } + + public IObjectMessage CreateObjectMessage(object body) { ActiveMQObjectMessage answer = new ActiveMQObjectMessage(); answer.Body = body; @@ -568,7 +569,7 @@ "You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: " + this.AcknowledgementMode); } - + this.TransactionContext.Commit(); } @@ -580,7 +581,7 @@ "You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: " + this.AcknowledgementMode); } - + this.TransactionContext.Rollback(); } @@ -659,7 +660,7 @@ { connection.removeDispatcher(objectId); this.lastDeliveredSequenceId = Math.Min(this.lastDeliveredSequenceId, lastDeliveredSequenceId); - + if(!this.closing) { consumers.Remove(objectId); @@ -699,7 +700,7 @@ { answer.PrefetchSize = this.connection.PrefetchPolicy.QueuePrefetch; } - + // If the destination contained a URI query, then use it to set public properties // on the ConsumerInfo ActiveMQDestination amqDestination = destination as ActiveMQDestination; @@ -747,11 +748,11 @@ { consumer.Start(); } - + if(this.executor != null) { this.executor.Start(); - } + } } public bool Started @@ -769,32 +770,24 @@ foreach(MessageDispatch message in messages) { - if(Tracer.IsDebugEnabled) - { - Tracer.DebugFormat("Resending Message Dispatch: ", message.ToString()); - } this.executor.ExecuteFirst(message); } } - + public void Dispatch(MessageDispatch dispatch) { if(this.executor != null) { - if(Tracer.IsDebugEnabled) - { - Tracer.DebugFormat("Send Message Dispatch: ", dispatch.ToString()); - } this.executor.Execute(dispatch); } } - internal void ClearMessagesInProgress() - { + internal void ClearMessagesInProgress() + { if( this.executor != null ) { this.executor.ClearMessagesInProgress(); } - + lock(this.consumers.SyncRoot) { foreach(MessageConsumer consumer in this.consumers) @@ -811,7 +804,7 @@ foreach(MessageConsumer consumer in this.consumers.Values) { consumer.Acknowledge(); - } + } } } @@ -828,6 +821,23 @@ return message; } + internal void SendAck(MessageAck ack) + { + this.SendAck(ack, false); + } + + internal void SendAck(MessageAck ack, bool lazy) + { + if(lazy || connection.SendAcksAsync || this.IsTransacted ) + { + this.connection.Oneway(ack); + } + else + { + this.connection.SyncRequest(ack); + } + } + /// /// Prevents message from throwing an exception if a client calls Acknoweldge on /// a message that is part of a transaction either being produced or consumed. The @@ -840,6 +850,6 @@ private void DoNothingAcknowledge(ActiveMQMessage message) { } - + } } Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/TransactionContext.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/TransactionContext.cs?rev=891876&r1=891875&r2=891876&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/TransactionContext.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.2.x/src/main/csharp/TransactionContext.cs Thu Dec 17 20:34:10 2009 @@ -75,6 +75,11 @@ info.Type = (int) TransactionType.Begin; this.session.Connection.Oneway(info); + + if(Tracer.IsDebugEnabled) + { + Tracer.Debug("Begin:" + this.transactionId.ToString()); + } } } @@ -87,6 +92,13 @@ this.BeforeEnd(); + if(Tracer.IsDebugEnabled) + { + Tracer.Debug("Rollback: " + this.transactionId + + " syncCount: " + + (synchronizations != null ? synchronizations.Count : 0)); + } + TransactionInfo info = new TransactionInfo(); info.ConnectionId = this.session.Connection.ConnectionId; info.TransactionId = transactionId; @@ -107,7 +119,14 @@ } this.BeforeEnd(); - + + if(Tracer.IsDebugEnabled) + { + Tracer.Debug("Commit: " + this.transactionId + + " syncCount: " + + (synchronizations != null ? synchronizations.Count : 0)); + } + TransactionInfo info = new TransactionInfo(); info.ConnectionId = this.session.Connection.ConnectionId; info.TransactionId = transactionId;