Author: jgomes
Date: Fri Sep 16 22:52:45 2011
New Revision: 1171843
URL: http://svn.apache.org/viewvc?rev=1171843&view=rev
Log:
Refactor to minimize the amount of time that a lock is kept. This will also stop the potential
of a deadlock from occurring while dispatching a message.
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/MessageConsumer.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Util/SimplePriorityMessageDispatchChannel.cs
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/MessageConsumer.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/MessageConsumer.cs?rev=1171843&r1=1171842&r2=1171843&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/MessageConsumer.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/MessageConsumer.cs
Fri Sep 16 22:52:45 2011
@@ -596,6 +596,7 @@ namespace Apache.NMS.ActiveMQ
public virtual void Dispatch(MessageDispatch dispatch)
{
MessageListener listener = this.listener;
+ bool dispatchMessage = false;
try
{
@@ -623,47 +624,52 @@ namespace Apache.NMS.ActiveMQ
{
if(listener != null && this.unconsumedMessages.Running)
{
- ActiveMQMessage message = CreateActiveMQMessage(dispatch);
-
- this.BeforeMessageIsConsumed(dispatch);
+ dispatchMessage = true;
+ }
+ else
+ {
+ this.unconsumedMessages.Enqueue(dispatch);
+ }
+ }
+ }
- try
- {
- bool expired = (!IgnoreExpiration && message.IsExpired());
+ if(dispatchMessage)
+ {
+ ActiveMQMessage message = CreateActiveMQMessage(dispatch);
- if(!expired)
- {
- listener(message);
- }
+ this.BeforeMessageIsConsumed(dispatch);
- this.AfterMessageIsConsumed(dispatch, expired);
- }
- catch(Exception e)
- {
- if(IsAutoAcknowledgeBatch || IsAutoAcknowledgeEach || IsIndividualAcknowledge)
- {
- // Redeliver the message
- }
- else
- {
- // Transacted or Client ack: Deliver the next message.
- this.AfterMessageIsConsumed(dispatch, false);
- }
+ try
+ {
+ bool expired = (!IgnoreExpiration && message.IsExpired());
- Tracer.Error(this.info.ConsumerId + " Exception while processing message: " + e);
+ if(!expired)
+ {
+ listener(message);
+ }
- // If aborted we stop the abort here and let normal processing resume.
- // This allows the session to shutdown normally and ack all messages
- // that have outstanding acks in this consumer.
- if( (Thread.CurrentThread.ThreadState & ThreadState.AbortRequested) == ThreadState.AbortRequested)
- {
- Thread.ResetAbort();
- }
- }
+ this.AfterMessageIsConsumed(dispatch, expired);
+ }
+ catch(Exception e)
+ {
+ if(IsAutoAcknowledgeBatch || IsAutoAcknowledgeEach || IsIndividualAcknowledge)
+ {
+ // Redeliver the message
}
else
{
- this.unconsumedMessages.Enqueue(dispatch);
+ // Transacted or Client ack: Deliver the next message.
+ this.AfterMessageIsConsumed(dispatch, false);
+ }
+
+ Tracer.Error(this.info.ConsumerId + " Exception while processing message: " + e);
+
+ // If aborted we stop the abort here and let normal processing resume.
+ // This allows the session to shutdown normally and ack all messages
+ // that have outstanding acks in this consumer.
+ if((Thread.CurrentThread.ThreadState & ThreadState.AbortRequested) == ThreadState.AbortRequested)
+ {
+ Thread.ResetAbort();
}
}
}
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Util/SimplePriorityMessageDispatchChannel.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Util/SimplePriorityMessageDispatchChannel.cs?rev=1171843&r1=1171842&r2=1171843&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Util/SimplePriorityMessageDispatchChannel.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Util/SimplePriorityMessageDispatchChannel.cs
Fri Sep 16 22:52:45 2011
@@ -51,10 +51,7 @@ namespace Apache.NMS.ActiveMQ.Util
{
get
{
- lock(this.mutex)
- {
- return this.closed;
- }
+ return this.closed;
}
set
@@ -70,10 +67,7 @@ namespace Apache.NMS.ActiveMQ.Util
{
get
{
- lock(this.mutex)
- {
- return this.running;
- }
+ return this.running;
}
set
@@ -89,10 +83,7 @@ namespace Apache.NMS.ActiveMQ.Util
{
get
{
- lock(mutex)
- {
- return this.size == 0;
- }
+ return this.size == 0;
}
}
@@ -100,10 +91,7 @@ namespace Apache.NMS.ActiveMQ.Util
{
get
{
- lock(mutex)
- {
- return this.size;
- }
+ return this.size;
}
}
|