activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject activemq git commit: AMQ-6361 - fix contention over expiry processing with expiry task and client expiry ack, unit test regression sorted.
Date Fri, 22 Jul 2016 14:18:56 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 2a815c2e0 -> 6cc2c1190


AMQ-6361 - fix contention over expiry processing with expiry task and client expiry ack, unit
test regression sorted.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/6cc2c119
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/6cc2c119
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/6cc2c119

Branch: refs/heads/master
Commit: 6cc2c1190da15579a9a886bad03596e14a77a677
Parents: 2a815c2
Author: gtully <gary.tully@gmail.com>
Authored: Fri Jul 22 15:18:31 2016 +0100
Committer: gtully <gary.tully@gmail.com>
Committed: Fri Jul 22 15:18:31 2016 +0100

----------------------------------------------------------------------
 .../broker/region/IndirectMessageReference.java       |  5 +++++
 .../activemq/broker/region/NullMessageReference.java  |  5 +++++
 .../activemq/broker/region/PrefetchSubscription.java  |  4 +++-
 .../apache/activemq/broker/region/RegionBroker.java   | 14 +-------------
 .../java/org/apache/activemq/broker/region/Topic.java |  8 +++++---
 .../region/cursors/FilePendingMessageCursor.java      |  2 +-
 .../activemq/broker/region/MessageReference.java      |  2 ++
 .../java/org/apache/activemq/command/Message.java     |  7 +++++++
 .../broker/region/cursors/OrderPendingListTest.java   |  5 +++++
 .../region/cursors/PrioritizedPendingListTest.java    |  5 +++++
 .../apache/activemq/usecases/ExpiredMessagesTest.java |  1 -
 11 files changed, 39 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/6cc2c119/activemq-broker/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
index c1b5f3c..104dcb0 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
@@ -204,4 +204,9 @@ public class IndirectMessageReference implements QueueMessageReference
{
     public boolean isAdvisory() {
        return message.isAdvisory();
     }
+
+    @Override
+    public boolean canProcessAsExpired() {
+        return message.canProcessAsExpired();
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/6cc2c119/activemq-broker/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java
index bef9b23..e8d26a8 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java
@@ -154,4 +154,9 @@ public final class NullMessageReference implements QueueMessageReference
{
         return false;
     }
 
+    @Override
+    public boolean canProcessAsExpired() {
+        return false;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/6cc2c119/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
index bc04566..5254440 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
@@ -323,7 +323,9 @@ public abstract class PrefetchSubscription extends AbstractSubscription
{
                     }
                     if (inAckRange) {
                         Destination regionDestination = nodeDest;
-                        regionDestination.messageExpired(context, this, node);
+                        if (broker.isExpired(node)) {
+                            regionDestination.messageExpired(context, this, node);
+                        }
                         iter.remove();
                         nodeDest.getDestinationStatistics().getInflight().decrement();
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/6cc2c119/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
index 961618d..69e0930 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
@@ -735,19 +735,7 @@ public class RegionBroker extends EmptyBroker {
 
     @Override
     public boolean isExpired(MessageReference messageReference) {
-        boolean expired = false;
-        if (messageReference.isExpired()) {
-            try {
-                // prevent duplicate expiry processing
-                Message message = messageReference.getMessage();
-                synchronized (message) {
-                    expired = stampAsExpired(message);
-                }
-            } catch (IOException e) {
-                LOG.warn("unexpected exception on message expiry determination for: {}",
messageReference, e);
-            }
-        }
-        return expired;
+        return messageReference.canProcessAsExpired();
     }
 
     private boolean stampAsExpired(Message message) throws IOException {

http://git-wip-us.apache.org/repos/asf/activemq/blob/6cc2c119/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
index c43f55e..bf803c1 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
@@ -527,9 +527,11 @@ public class Topic extends BaseDestination implements Task {
                     // It could take while before we receive the commit
                     // operation.. by that time the message could have
                     // expired..
-                    if (broker.isExpired(message)) {
-                        getDestinationStatistics().getExpired().increment();
-                        broker.messageExpired(context, message, null);
+                    if (message.isExpired()) {
+                        if (broker.isExpired(message)) {
+                            getDestinationStatistics().getExpired().increment();
+                            broker.messageExpired(context, message, null);
+                        }
                         message.decrementReferenceCount();
                         return;
                     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/6cc2c119/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
index 5bd0cda..20b2bc5 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
@@ -483,7 +483,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor
imple
 
     private void discardExpiredMessage(MessageReference reference) {
         LOG.debug("Discarding expired message {}", reference);
-        if (broker.isExpired(reference)) {
+        if (reference.isExpired() && broker.isExpired(reference)) {
             ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext());
             context.setBroker(broker);
             ((Destination)reference.getRegionDestination()).messageExpired(context, null,
new IndirectMessageReference(reference.getMessage()));

http://git-wip-us.apache.org/repos/asf/activemq/blob/6cc2c119/activemq-client/src/main/java/org/apache/activemq/broker/region/MessageReference.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/broker/region/MessageReference.java
b/activemq-client/src/main/java/org/apache/activemq/broker/region/MessageReference.java
index 64c91fa..50d23ed 100755
--- a/activemq-client/src/main/java/org/apache/activemq/broker/region/MessageReference.java
+++ b/activemq-client/src/main/java/org/apache/activemq/broker/region/MessageReference.java
@@ -63,5 +63,7 @@ public interface MessageReference {
      * @return true if the message is an advisory
      */
     boolean isAdvisory();
+
+    boolean canProcessAsExpired();
     
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/6cc2c119/activemq-client/src/main/java/org/apache/activemq/command/Message.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/Message.java b/activemq-client/src/main/java/org/apache/activemq/command/Message.java
index a500768..83f3201 100755
--- a/activemq-client/src/main/java/org/apache/activemq/command/Message.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/Message.java
@@ -24,6 +24,7 @@ import java.io.OutputStream;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.zip.DeflaterOutputStream;
 
 import javax.jms.JMSException;
@@ -94,6 +95,7 @@ public abstract class Message extends BaseCommand implements MarshallAware,
Mess
     private transient ActiveMQConnection connection;
     transient MessageDestination regionDestination;
     transient MemoryUsage memoryUsage;
+    transient AtomicBoolean processAsExpired = new AtomicBoolean(false);
 
     private BrokerId[] brokerPath;
     private BrokerId[] cluster;
@@ -837,4 +839,9 @@ public abstract class Message extends BaseCommand implements MarshallAware,
Mess
         }
         return super.toString(overrideFields);
     }
+
+    @Override
+    public boolean canProcessAsExpired() {
+        return processAsExpired.compareAndSet(false, true);
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/6cc2c119/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java
index 827c39b..74c7686 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java
@@ -498,5 +498,10 @@ public class OrderPendingListTest {
         public boolean isAdvisory() {
             return false;
         }
+
+        @Override
+        public boolean canProcessAsExpired() {
+            return false;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/6cc2c119/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingListTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingListTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingListTest.java
index 8ecc951..f95fb97 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingListTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingListTest.java
@@ -349,5 +349,10 @@ public class PrioritizedPendingListTest {
         public boolean isAdvisory() {
             return false;
         }
+
+        @Override
+        public boolean canProcessAsExpired() {
+            return false;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/6cc2c119/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
index 0205599..0187aad 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
@@ -34,7 +34,6 @@ import org.slf4j.LoggerFactory;
 
 import javax.jms.*;
 
-import java.io.File;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.activemq.TestSupport.getDestination;


Mime
View raw message