activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r933240 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: ActiveMQMessageConsumer.java ActiveMQSession.java
Date Mon, 12 Apr 2010 14:03:19 GMT
Author: gtully
Date: Mon Apr 12 14:03:18 2010
New Revision: 933240

URL: http://svn.apache.org/viewvc?rev=933240&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2693 - loaded machine with slow thread
creation can delay interruption processing past next dispatch which can be problematic. prefetch=1
will workaround

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=933240&r1=933239&r2=933240&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Mon Apr 12 14:03:18 2010
@@ -144,6 +144,7 @@ public class ActiveMQMessageConsumer imp
     private ExecutorService executorService;
     private MessageTransformer transformer;
     private boolean clearDispatchList;
+    boolean inProgressClearRequiredFlag;
 
     private MessageAck pendingAck;
     private long lastDeliveredSequenceId;
@@ -655,23 +656,32 @@ public class ActiveMQMessageConsumer imp
         this.session.asyncSendPacket(removeCommand);
     }
     
-    void clearMessagesInProgress() {
+    void inProgressClearRequired() {
+        inProgressClearRequiredFlag = true;
         // deal with delivered messages async to avoid lock contention with in progress acks
         clearDispatchList = true;
-        synchronized (unconsumedMessages.getMutex()) {            
-            if (LOG.isDebugEnabled()) {
-                LOG.debug(getConsumerId() + " clearing dispatched list (" + unconsumedMessages.size()
+ ") on transport interrupt");
-            }
-            // ensure unconsumed are rolledback up front as they may get redelivered to another
consumer
-            List<MessageDispatch> list = unconsumedMessages.removeAll();
-            if (!this.info.isBrowser()) {
-                for (MessageDispatch old : list) {
-                    session.connection.rollbackDuplicate(this, old.getMessage());
+    }
+    
+    void clearMessagesInProgress() {
+        if (inProgressClearRequiredFlag) {
+            synchronized (unconsumedMessages.getMutex()) {
+                if (inProgressClearRequiredFlag) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug(getConsumerId() + " clearing dispatched list (" + unconsumedMessages.size()
+ ") on transport interrupt");
+                    }
+                    // ensure unconsumed are rolledback up front as they may get redelivered
to another consumer
+                    List<MessageDispatch> list = unconsumedMessages.removeAll();
+                    if (!this.info.isBrowser()) {
+                        for (MessageDispatch old : list) {
+                            session.connection.rollbackDuplicate(this, old.getMessage());
+                        }
+                    }
+                    // allow dispatch on this connection to resume
+                    session.connection.transportInterruptionProcessingComplete();
+                    inProgressClearRequiredFlag = false;
                 }
             }
         }
-        // allow dispatch on this connection to resume
-        session.connection.transportInterruptionProcessingComplete();
     }
 
     void deliverAcks() {
@@ -1192,6 +1202,7 @@ public class ActiveMQMessageConsumer imp
     public void dispatch(MessageDispatch md) {
         MessageListener listener = this.messageListener.get();
         try {
+            clearMessagesInProgress();
             clearDispatchList();
             synchronized (unconsumedMessages.getMutex()) {
                 if (!unconsumedMessages.isClosed()) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=933240&r1=933239&r2=933240&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Mon
Apr 12 14:03:18 2010
@@ -649,6 +649,7 @@ public class ActiveMQSession implements 
         // connection.transportInterruptionProcessingComplete()
         //
         for (final ActiveMQMessageConsumer consumer : consumers) {
+            consumer.inProgressClearRequired();
             scheduler.executeAfterDelay(new Runnable() {
                 public void run() {
                     consumer.clearMessagesInProgress();



Mime
View raw message